From a4346f1ee87f221d8541ad31b2efb3bba41a4df4 Mon Sep 17 00:00:00 2001 From: pschwan Date: Tue, 11 Mar 2003 23:37:27 +0000 Subject: [PATCH] merge b_devel into HEAD. Includes: - client-side I/O cache - O_DIRECT fixes for IA64 - liblustre improvements - various small bug fixes --- lustre/ChangeLog | 9 +- lustre/Makefile.am | 2 +- lustre/conf/slapd-lustre.conf | 2 +- lustre/configure.in | 24 +- lustre/extN/ext3-2.5-noread.diff | 266 +++++++++++++ lustre/extN/extN-noread.diff | 20 +- lustre/include/liblustre.h | 5 +- lustre/include/linux/lprocfs_status.h | 6 +- lustre/include/linux/lustre_dlm.h | 8 +- lustre/include/linux/lustre_idl.h | 3 +- lustre/include/linux/lustre_lib.h | 62 +-- lustre/include/linux/lustre_lite.h | 42 ++- lustre/include/linux/lustre_mds.h | 2 +- lustre/include/linux/lustre_net.h | 4 +- lustre/include/linux/obd_class.h | 29 +- lustre/include/linux/obd_ost.h | 10 +- lustre/include/linux/obd_ptlbd.h | 2 +- lustre/include/linux/obd_support.h | 14 +- .../patches/iod-stock-24-exports_hp.patch | 41 ++ lustre/kernel_patches/series/hp-pnnl | 1 + lustre/kernel_patches/series/rh-8.0 | 9 - lustre/ldlm/ldlm_lockd.c | 125 ++++--- lustre/lib/obd_pack.c | 47 +-- lustre/lib/target.c | 7 +- lustre/liblustre/Makefile.am | 8 +- lustre/liblustre/libtest.c | 2 +- lustre/llite/Makefile.am | 2 +- lustre/llite/commit_callback.c | 15 +- lustre/llite/file.c | 373 ++++++++++-------- lustre/llite/iod.c | 415 +++++++++++++++++++++ lustre/llite/rw.c | 393 ++++++++++--------- lustre/llite/super.c | 52 ++- lustre/llite/super25.c | 38 +- lustre/lov/lov_obd.c | 32 +- lustre/mds/mds_fs.c | 3 + lustre/obdclass/class_obd.c | 29 +- lustre/obdclass/genops.c | 10 +- lustre/obdecho/echo_client.c | 4 +- lustre/obdfilter/Makefile.am | 1 - lustre/obdfilter/filter.c | 214 ++++++----- lustre/osc/osc_request.c | 141 ++++--- lustre/ost/ost_handler.c | 127 ++++--- lustre/ptlbd/blk.c | 11 +- lustre/ptlbd/client.c | 11 +- lustre/ptlbd/main.c | 1 + lustre/ptlbd/rpc.c | 66 ++-- lustre/ptlbd/server.c | 7 +- lustre/ptlrpc/client.c | 19 +- lustre/ptlrpc/niobuf.c | 33 +- lustre/ptlrpc/rpc.c | 3 +- lustre/ptlrpc/service.c | 15 +- lustre/scripts/lustre.spec.in | 7 + lustre/tests/ba-echo.sh | 8 +- lustre/tests/directio.c | 14 +- lustre/tests/lkcdmap | 10 +- lustre/tests/llechocleanup.sh | 2 +- lustre/tests/openclose.c | 9 +- lustre/tests/runas.c | 133 +++++++ lustre/tests/sanity.sh | 3 +- lustre/tests/sanityN.sh | 29 +- lustre/tests/test_brw.c | 51 ++- lustre/utils/lctl.c | 2 +- lustre/utils/obd.c | 2 +- lustre/utils/obdbarrier.c | 88 ++--- lustre/utils/obdio.c | 42 +-- lustre/utils/obdiolib.c | 400 ++++++++++---------- 66 files changed, 2394 insertions(+), 1171 deletions(-) create mode 100644 lustre/extN/ext3-2.5-noread.diff create mode 100644 lustre/kernel_patches/patches/iod-stock-24-exports_hp.patch delete mode 100644 lustre/kernel_patches/series/rh-8.0 create mode 100644 lustre/llite/iod.c create mode 100644 lustre/tests/runas.c diff --git a/lustre/ChangeLog b/lustre/ChangeLog index 61193c7..97789a8 100644 --- a/lustre/ChangeLog +++ b/lustre/ChangeLog @@ -1,5 +1,5 @@ -TBD - * version v0_5_21 +2003-03-11 Phil Schwan + * version v0_6 * bug fixes - LDLM_DEBUG macro fix, for gcc 3.2 (850) - failed open()s could cause deadlock; fixed (867, 869) @@ -17,6 +17,11 @@ TBD - if a bad lock AST arrives, send an error instead of dropping entirely - return 0 from revalidate2 if ll_intent_lock returns -EINTR (912) - fix leak in bulk IO when only partially completed (899, 900, 926) + - fix O_DIRECT for ia64 (55) + - (almost) eliminate Lustre-kernel-thread effects on load average (722) + - C-z after timeout could hang a process forever; fixed (977) + * Features + - client-side I/O cache (678, 924, 929, 941, 970) * protocol changes - READPAGE and SETATTRs which don't take server-side locks get their own portal diff --git a/lustre/Makefile.am b/lustre/Makefile.am index 9f837ad..7ad7358 100644 --- a/lustre/Makefile.am +++ b/lustre/Makefile.am @@ -22,7 +22,7 @@ SUBDIRS = $(DIRS24) obdclass mds utils ptlrpc ldlm lib obdfilter mdc osc ost SUBDIRS+= llite obdecho lov cobd tests doc scripts conf endif -DIST_SUBDIRS = $(SUBDIRS) +DIST_SUBDIRS = $(SUBDIRS) liblustre EXTRA_DIST = BUGS FDL Rules include archdep.m4 kernel_patches # We get the version from the spec file. diff --git a/lustre/conf/slapd-lustre.conf b/lustre/conf/slapd-lustre.conf index de89c76..7906908 100644 --- a/lustre/conf/slapd-lustre.conf +++ b/lustre/conf/slapd-lustre.conf @@ -1,6 +1,6 @@ ####################################################################### # lustre ldap config database -# $Id: slapd-lustre.conf,v 1.2 2003/01/06 22:17:53 adilger Exp $ +# $Id: slapd-lustre.conf,v 1.3 2003/03/11 23:36:45 pschwan Exp $ ####################################################################### database ldbm diff --git a/lustre/configure.in b/lustre/configure.in index 6384d30..5c5f438 100644 --- a/lustre/configure.in +++ b/lustre/configure.in @@ -57,13 +57,29 @@ fi AC_SUBST(LIBREADLINE) AC_SUBST(HAVE_LIBREADLINE) +AC_ARG_ENABLE(efence, [ --enable-efence use efence library],, + enable_efence="no") + +if test "$enable_efence" = "yes" ; then + LIBEFENCE="-lefence" + HAVE_LIBEFENCE="-DHAVE_LIBEFENCE=1" +else + LIBEFENCE="" + HAVE_LIBEFENCE="" +fi +AC_SUBST(LIBEFENCE) +AC_SUBST(HAVE_LIBEFENCE) + # XXX this should be a runtime option +AC_MSG_CHECKING(if you are enabling OST recovery...) AC_ARG_ENABLE(ost_recovery, [ --enable-ost-recovery: enable support for ost recovery],, - enable_ost_recovery="yes") + enable_ost_recovery="no") if test "$enable_ost_recovery" = "yes" ; then ENABLE_OST_RECOVERY="-DOST_RECOVERY=1" + AC_MSG_RESULT(yes) else - HAVE_LIBREADLINE="" + ENABLE_OST_RECOVERY="" + AC_MSG_RESULT(no) fi AC_SUBST(ENABLE_OST_RECOVERY) @@ -131,7 +147,7 @@ KINCFLAGS='-I$(top_srcdir)/include -I$(PORTALS)/include -I$(LINUX)/include' else KINCFLAGS='-I$(top_srcdir)/include -I$(PORTALS)/include' fi -CPPFLAGS="$KINCFLAGS $ARCHCPPFLAGS" +CPPFLAGS="$KINCFLAGS $ARCHCPPFLAGS $ENABLE_OST_RECOVERY" if test $host_cpu != "lib" ; then AC_MSG_CHECKING(if make dep has been run in kernel source (host $host_cpu) ) @@ -153,7 +169,7 @@ AC_MSG_CHECKING(for Linux release) dnl We need to rid ourselves of the nasty [ ] quotes. changequote(, ) dnl Get release from version.h -RELEASE="`sed -ne 's/.*UTS_RELEASE[ \"]*\([0-9.a-zA-Z-]*\).*/\1/p' $LINUX/include/linux/version.h`" +RELEASE="`sed -ne 's/.*UTS_RELEASE[ \"]*\([0-9.a-zA-Z_-]*\).*/\1/p' $LINUX/include/linux/version.h`" changequote([, ]) moduledir='$(libdir)/modules/'$RELEASE/kernel diff --git a/lustre/extN/ext3-2.5-noread.diff b/lustre/extN/ext3-2.5-noread.diff new file mode 100644 index 0000000..f1c611f --- /dev/null +++ b/lustre/extN/ext3-2.5-noread.diff @@ -0,0 +1,266 @@ +===== fs/ext3/ialloc.c 1.26 vs edited ===== +--- 1.26/fs/ext3/ialloc.c Fri Feb 14 19:24:09 2003 ++++ edited/fs/ext3/ialloc.c Sat Mar 8 01:20:55 2003 +@@ -195,6 +195,36 @@ + } + + /* ++ * @block_group: block group of inode ++ * @offset: relative offset of inode within @block_group ++ * ++ * Check whether any of the inodes in this disk block are in use. ++ * ++ * Caller must be holding superblock lock (group/bitmap read lock in ++ * future). ++ */ ++int ext3_itable_block_used(struct super_block *sb, unsigned int block_group, ++ int offset) ++{ ++ struct buffer_head *ibitmap = read_inode_bitmap(sb, block_group); ++ int inodes_per_block; ++ unsigned long inum, iend; ++ ++ if (!ibitmap) ++ return 1; ++ ++ inodes_per_block = sb->s_blocksize / EXT3_SB(sb)->s_inode_size; ++ inum = offset & ~(inodes_per_block - 1); ++ iend = inum + inodes_per_block; ++ for (; inum < iend; inum++) { ++ if (inum != offset && ext3_test_bit(inum, ibitmap->b_data)) ++ return 1; ++ } ++ ++ return 0; ++} ++ ++/* + * There are two policies for allocating an inode. If the new inode is + * a directory, then a forward search is made for a block group with both + * free space and a low directory-to-inode ratio; if that fails, then of +@@ -422,8 +452,9 @@ + struct ext3_group_desc * gdp; + struct ext3_super_block * es; + struct ext3_inode_info *ei; +- int err = 0; ++ struct ext3_iloc iloc; + struct inode *ret; ++ int err = 0; + + /* Cannot create files in a deleted directory */ + if (!dir || !dir->i_nlink) +@@ -587,16 +618,23 @@ + goto fail2; + } + err = ext3_init_acl(handle, inode, dir); ++ if (err) ++ goto fail3; ++ ++ err = ext3_get_inode_loc_new(inode, &iloc, 1); ++ if (err) ++ goto fail3; ++ ++ BUFFER_TRACE(iloc->bh, "get_write_access"); ++ err = ext3_journal_get_write_access(handle, iloc.bh); + if (err) { +- DQUOT_FREE_INODE(inode); +- goto fail2; +- } +- err = ext3_mark_inode_dirty(handle, inode); +- if (err) { +- ext3_std_error(sb, err); +- DQUOT_FREE_INODE(inode); +- goto fail2; +- } ++ brelse(iloc.bh); ++ iloc.bh = NULL; ++ goto fail3; ++ } ++ err = ext3_mark_iloc_dirty(handle, inode, &iloc); ++ if (err) ++ goto fail3; + + ext3_debug("allocating inode %lu\n", inode->i_ino); + goto really_out; +@@ -610,6 +648,9 @@ + brelse(bitmap_bh); + return ret; + ++fail3: ++ ext3_std_error(sb, err); ++ DQUOT_FREE_INODE(inode); + fail2: + inode->i_flags |= S_NOQUOTA; + inode->i_nlink = 0; +===== fs/ext3/inode.c 1.62 vs edited ===== +--- 1.62/fs/ext3/inode.c Fri Feb 14 19:24:09 2003 ++++ edited/fs/ext3/inode.c Sat Mar 8 02:10:39 2003 +@@ -2144,69 +2144,118 @@ + unlock_kernel(); + } + +-/* +- * ext3_get_inode_loc returns with an extra refcount against the +- * inode's underlying buffer_head on success. +- */ ++#define NUM_INODE_PREREAD 16 + +-int ext3_get_inode_loc (struct inode *inode, struct ext3_iloc *iloc) ++/* ++ * ext3_get_inode_loc returns with an extra refcount against the inode's ++ * underlying buffer_head on success. If this is for a new inode allocation ++ * (new is non-zero) then we may be able to optimize away the read if there ++ * are no other in-use inodes in this inode table block. If we need to do ++ * a read, then read in a whole chunk of blocks to avoid blocking again soon ++ * if we are doing lots of creates/updates. ++ */ ++int ext3_get_inode_loc_new(struct inode *inode, struct ext3_iloc *iloc, int new) + { +- struct buffer_head *bh = 0; ++ struct buffer_head *bh[NUM_INODE_PREREAD]; ++ struct super_block *sb = inode->i_sb; ++ struct ext3_sb_info *sbi = EXT3_SB(sb); ++ unsigned long ino = inode->i_ino; + unsigned long block; + unsigned long block_group; + unsigned long group_desc; + unsigned long desc; + unsigned long offset; + struct ext3_group_desc * gdp; +- +- if ((inode->i_ino != EXT3_ROOT_INO && +- inode->i_ino != EXT3_JOURNAL_INO && +- inode->i_ino < EXT3_FIRST_INO(inode->i_sb)) || +- inode->i_ino > le32_to_cpu( +- EXT3_SB(inode->i_sb)->s_es->s_inodes_count)) { +- ext3_error (inode->i_sb, "ext3_get_inode_loc", +- "bad inode number: %lu", inode->i_ino); ++ ++ if ((ino != EXT3_ROOT_INO && ino != EXT3_JOURNAL_INO && ++ ino < EXT3_FIRST_INO(sb)) || ++ ino > le32_to_cpu(sbi->s_es->s_inodes_count)) { ++ ext3_error(sb, "ext3_get_inode_loc", "bad inode number: %lu", ++ ino); + goto bad_inode; + } +- block_group = (inode->i_ino - 1) / EXT3_INODES_PER_GROUP(inode->i_sb); +- if (block_group >= EXT3_SB(inode->i_sb)->s_groups_count) { +- ext3_error (inode->i_sb, "ext3_get_inode_loc", +- "group >= groups count"); ++ block_group = (ino - 1) / EXT3_INODES_PER_GROUP(sb); ++ if (block_group >= EXT3_SB(sb)->s_groups_count) { ++ ext3_error(sb, "ext3_get_inode_loc", "group >= groups count"); + goto bad_inode; + } +- group_desc = block_group >> EXT3_DESC_PER_BLOCK_BITS(inode->i_sb); +- desc = block_group & (EXT3_DESC_PER_BLOCK(inode->i_sb) - 1); +- bh = EXT3_SB(inode->i_sb)->s_group_desc[group_desc]; +- if (!bh) { +- ext3_error (inode->i_sb, "ext3_get_inode_loc", +- "Descriptor not loaded"); ++ group_desc = block_group >> EXT3_DESC_PER_BLOCK_BITS(sb); ++ desc = block_group & (EXT3_DESC_PER_BLOCK(sb) - 1); ++ if (!sbi->s_group_desc[group_desc]) { ++ ext3_error(sb, "ext3_get_inode_loc", "Descriptor not loaded"); + goto bad_inode; + } + +- gdp = (struct ext3_group_desc *) bh->b_data; ++ gdp = (struct ext3_group_desc *)(sbi->s_group_desc[group_desc]->b_data); + /* + * Figure out the offset within the block group inode table + */ +- offset = ((inode->i_ino - 1) % EXT3_INODES_PER_GROUP(inode->i_sb)) * +- EXT3_INODE_SIZE(inode->i_sb); ++ offset = ((ino - 1) % EXT3_INODES_PER_GROUP(sb)); + block = le32_to_cpu(gdp[desc].bg_inode_table) + +- (offset >> EXT3_BLOCK_SIZE_BITS(inode->i_sb)); +- if (!(bh = sb_bread(inode->i_sb, block))) { +- ext3_error (inode->i_sb, "ext3_get_inode_loc", +- "unable to read inode block - " +- "inode=%lu, block=%lu", inode->i_ino, block); +- goto bad_inode; ++ (offset * sbi->s_inode_size >> EXT3_BLOCK_SIZE_BITS(sb)); ++ bh[0] = sb_getblk(sb, block); ++ if (buffer_uptodate(bh[0])) ++ goto done; ++ ++ /* If we don't really need to read this block, and it isn't already ++ * in memory, then we just zero it out. Otherwise, we keep the ++ * current block contents (deleted inode data) for posterity. ++ */ ++ if (new && !ext3_itable_block_used(sb, block_group, offset)) { ++ lock_buffer(bh[0]); ++ memset(bh[0]->b_data, 0, bh[0]->b_size); ++ set_buffer_uptodate(bh[0]); ++ unlock_buffer(bh[0]); ++ } else { ++ unsigned long block_end, itable_end; ++ int count = 1; ++ ++ itable_end = le32_to_cpu(gdp[desc].bg_inode_table) + ++ sbi->s_itb_per_group; ++ block_end = block + NUM_INODE_PREREAD; ++ if (block_end > itable_end) ++ block_end = itable_end; ++ ++ for (; block < block_end; block++) { ++ bh[count] = sb_getblk(sb, block); ++ if (count && (buffer_uptodate(bh[count]) || ++ buffer_locked(bh[count]))) { ++ __brelse(bh[count]); ++ } else ++ count++; ++ } ++ ++ ll_rw_block(READ, count, bh); ++ ++ /* Release all but the block we actually need (bh[0]) */ ++ while (--count > 0) ++ __brelse(bh[count]); ++ ++ wait_on_buffer(bh[0]); ++ if (!buffer_uptodate(bh[0])) { ++ ext3_error(sb, __FUNCTION__, ++ "unable to read inode block - " ++ "inode=%lu, block=%llu", ino, ++ (unsigned long long)bh[0]->b_blocknr); ++ goto bad_inode; ++ } + } +- offset &= (EXT3_BLOCK_SIZE(inode->i_sb) - 1); ++done: ++ offset = (offset * sbi->s_inode_size) & (EXT3_BLOCK_SIZE(sb) - 1); + +- iloc->bh = bh; +- iloc->raw_inode = (struct ext3_inode *) (bh->b_data + offset); ++ iloc->bh = bh[0]; ++ iloc->raw_inode = (struct ext3_inode *)(bh[0]->b_data + offset); + iloc->block_group = block_group; +- ++ + return 0; +- ++ + bad_inode: + return -EIO; ++} ++ ++int ext3_get_inode_loc(struct inode *inode, struct ext3_iloc *iloc) ++{ ++ return ext3_get_inode_loc_new(inode, iloc, 0); + } + + void ext3_read_inode(struct inode * inode) +===== include/linux/ext3_fs.h 1.22 vs edited ===== +--- 1.22/include/linux/ext3_fs.h Tue Jan 14 00:56:29 2003 ++++ edited/include/linux/ext3_fs.h Sat Mar 8 01:56:28 2003 +@@ -719,6 +719,8 @@ + extern struct buffer_head * ext3_getblk (handle_t *, struct inode *, long, int, int *); + extern struct buffer_head * ext3_bread (handle_t *, struct inode *, int, int, int *); + ++extern int ext3_itable_block_used(struct super_block *, unsigned int, int); ++extern int ext3_get_inode_loc_new(struct inode *, struct ext3_iloc *, int); + extern int ext3_get_inode_loc (struct inode *, struct ext3_iloc *); + extern void ext3_read_inode (struct inode *); + extern void ext3_write_inode (struct inode *, int); diff --git a/lustre/extN/extN-noread.diff b/lustre/extN/extN-noread.diff index 463516c..56220e2 100644 --- a/lustre/extN/extN-noread.diff +++ b/lustre/extN/extN-noread.diff @@ -80,7 +80,7 @@ diff -ru lustre-head/fs/extN/ialloc.c lustre/fs/extN/ialloc.c diff -ru lustre-head/fs/extN/inode.c lustre/fs/extN/inode.c --- lustre-head/fs/extN/inode.c Mon Dec 23 10:02:58 2002 +++ lustre/fs/extN/inode.c Mon Dec 23 09:50:25 2002 -@@ -2011,23 +1994,32 @@ +@@ -2011,23 +1994,28 @@ extN_journal_stop(handle, inode); } @@ -88,11 +88,7 @@ diff -ru lustre-head/fs/extN/inode.c lustre/fs/extN/inode.c - * extN_get_inode_loc returns with an extra refcount against the - * inode's underlying buffer_head on success. - */ -+extern int extN_itable_block_used(struct super_block *sb, -+ unsigned int block_group, -+ int offset); -+ -+#define NUM_INODE_PREREAD 16 ++#define NUM_INODE_PREREAD 16 -int extN_get_inode_loc (struct inode *inode, struct extN_iloc *iloc) +/* @@ -223,3 +219,15 @@ diff -ru lustre-head/fs/extN/inode.c lustre/fs/extN/inode.c void extN_read_inode(struct inode * inode) { struct extN_iloc iloc; +diff -ru include/linux/extN_fs.h.orig include/linux/extN_fs.h +--- lustre/include/linux/extN_fs.h.orig Sat Mar 8 01:23:09 2003 ++++ lustre/include/linux/extN_fs.h Sat Mar 8 01:24:31 2003 +@@ -642,6 +646,8 @@ + extern struct buffer_head * extN_getblk (handle_t *, struct inode *, long, int, int *); + extern struct buffer_head * extN_bread (handle_t *, struct inode *, int, int, int *); + ++extern int extN_itable_block_used(struct super_block *sb, unsigned int, int); ++extern int extN_get_inode_loc_new(struct inode *, struct extN_iloc *, int); + extern int extN_get_inode_loc (struct inode *, struct extN_iloc *); + extern void extN_read_inode (struct inode *); + extern void extN_write_inode (struct inode *, int); diff --git a/lustre/include/liblustre.h b/lustre/include/liblustre.h index 0b37021..1e57ea4 100644 --- a/lustre/include/liblustre.h +++ b/lustre/include/liblustre.h @@ -93,7 +93,7 @@ static inline void *kmalloc(int size, int prot) #define kfree(a) free(a) #define GFP_KERNEL 1 #define GFP_HIGHUSER 1 -#define IS_ERR(a) (abs((int)(a)) < 500 ? 1 : 0) +#define IS_ERR(a) (((a) && abs((int)(a)) < 500) ? 1 : 0) #define PTR_ERR(a) ((int)(a)) #define capable(foo) 1 @@ -258,7 +258,7 @@ static inline struct page *alloc_pages(mask,foo) if (!pg) return NULL; #ifdef MAP_ANONYMOUS - pg->addr = mmap(0, PAGE_SIZE, PROT_WRITE, MAP_ANONYMOUS, 0, 0); + pg->addr = mmap(0, PAGE_SIZE, PROT_WRITE, MAP_PRIVATE|MAP_ANONYMOUS, 0, 0); #else pg->addr = malloc(PAGE_SIZE); #endif @@ -347,7 +347,6 @@ extern struct task_struct *current; #define init_waitqueue_head(l) INIT_LIST_HEAD(&(l)->sleepers) #define wake_up(l) do { int a; a++; } while (0) -#define wait_event(l,m) do { int a; a++; } while (0) #define TASK_INTERRUPTIBLE 0 #define TASK_UNINTERRUPTIBLE 1 #define TASK_RUNNING 2 diff --git a/lustre/include/linux/lprocfs_status.h b/lustre/include/linux/lprocfs_status.h index cd8f12b..d0060fc 100644 --- a/lustre/include/linux/lprocfs_status.h +++ b/lustre/include/linux/lprocfs_status.h @@ -51,6 +51,7 @@ struct lprocfs_static_vars { extern struct proc_dir_entry *proc_lustre_root; +#ifdef LPROCFS #define LPROCFS_INIT_MULTI_VARS(array, size) \ void lprocfs_init_multi_vars(unsigned int idx, \ struct lprocfs_static_vars *x) \ @@ -62,7 +63,6 @@ void lprocfs_init_multi_vars(unsigned int idx, \ x->obd_vars = glob[idx].obd_vars; \ } \ -#ifdef LPROCFS #define LPROCFS_INIT_VARS(vclass, vinstance) \ void lprocfs_init_vars(struct lprocfs_static_vars *x) \ { \ @@ -135,6 +135,10 @@ int fct_name(char *page, char **start, off_t off, \ static inline struct proc_dir_entry * lprocfs_register(const char *name, struct proc_dir_entry *parent, struct lprocfs_vars *list, void *data) { return NULL; } +#define LPROCFS_INIT_MULTI_VARS(array, size) +static inline void lprocfs_init_multi_vars(unsigned int idx, + struct lprocfs_static_vars *x) { return; } +#define LPROCFS_INIT_VARS(vclass, vinstance) static inline void lprocfs_init_vars(struct lprocfs_static_vars *x) { return; } static inline int lprocfs_add_vars(struct proc_dir_entry *root, struct lprocfs_vars *var, diff --git a/lustre/include/linux/lustre_dlm.h b/lustre/include/linux/lustre_dlm.h index c120225..70e7e87 100644 --- a/lustre/include/linux/lustre_dlm.h +++ b/lustre/include/linux/lustre_dlm.h @@ -26,9 +26,11 @@ typedef enum { ELDLM_LOCK_CHANGED = 300, ELDLM_LOCK_ABORTED = 301, ELDLM_LOCK_REPLACED = 302, + ELDLM_LOCK_MATCHED = 303, ELDLM_NAMESPACE_EXISTS = 400, - ELDLM_BAD_NAMESPACE = 401 + ELDLM_BAD_NAMESPACE = 401, + ELDLM_GETATTR_ERROR = 402 } ldlm_error_t; #define LDLM_NAMESPACE_SERVER 0 @@ -135,7 +137,7 @@ struct ldlm_namespace { * */ -#define RES_HASH_BITS 14 +#define RES_HASH_BITS 10 #define RES_HASH_SIZE (1UL << RES_HASH_BITS) #define RES_HASH_MASK (RES_HASH_SIZE - 1) @@ -342,6 +344,8 @@ struct ldlm_lock *__ldlm_handle2lock(struct lustre_handle *, int flags); void ldlm_cancel_callback(struct ldlm_lock *); int ldlm_lock_set_data(struct lustre_handle *, void *data, void *cp_data); void ldlm_lock_remove_from_lru(struct ldlm_lock *); +struct ldlm_lock *ldlm_handle2lock_ns(struct ldlm_namespace *, + struct lustre_handle *); static inline struct ldlm_lock *ldlm_handle2lock(struct lustre_handle *h) { diff --git a/lustre/include/linux/lustre_idl.h b/lustre/include/linux/lustre_idl.h index 3ef86ac..b99d996 100644 --- a/lustre/include/linux/lustre_idl.h +++ b/lustre/include/linux/lustre_idl.h @@ -314,6 +314,7 @@ struct obd_statfs { #define OBD_BRW_WRITE 0x2 #define OBD_BRW_RWMASK (OBD_BRW_READ | OBD_BRW_WRITE) #define OBD_BRW_CREATE 0x4 +#define OBD_BRW_SYNC 0x8 #define OBD_OBJECT_EOF 0xffffffffffffffffULL @@ -322,7 +323,7 @@ struct obd_ioobj { obd_gr ioo_gr; __u32 ioo_type; __u32 ioo_bufcnt; -}; +} __attribute__((packed)); struct niobuf_remote { __u64 offset; diff --git a/lustre/include/linux/lustre_lib.h b/lustre/include/linux/lustre_lib.h index 41c67fff..6f38be0 100644 --- a/lustre/include/linux/lustre_lib.h +++ b/lustre/include/linux/lustre_lib.h @@ -29,6 +29,8 @@ # include #else # include +# include +# include #endif #include #include @@ -106,6 +108,7 @@ struct obd_brw_set { struct list_head brw_desc_head; /* list of ptlrpc_bulk_desc */ wait_queue_head_t brw_waitq; atomic_t brw_refcount; + atomic_t brw_desc_count; int brw_flags; int (*brw_callback)(struct obd_brw_set *, int phase); @@ -575,35 +578,45 @@ struct l_wait_info { lwi_cb_data: data \ }) +#define LUSTRE_FATAL_SIGS (sigmask(SIGKILL) | sigmask(SIGINT) | \ + sigmask(SIGTERM) | sigmask(SIGQUIT)) + #ifdef __KERNEL__ -#define l_sigismember sigismember -#else -#define l_sigismember(a,b) (*(a) & b) -#endif +static inline sigset_t l_w_e_set_sigs(int sigs) +{ + sigset_t old; + unsigned long irqflags; -/* XXX this should be one mask-check */ -#define l_killable_pending(task) \ -(l_sigismember(&(task->pending.signal), SIGKILL) || \ - l_sigismember(&(task->pending.signal), SIGINT) || \ - l_sigismember(&(task->pending.signal), SIGTERM)) + spin_lock_irqsave(¤t->sigmask_lock, irqflags); + old = current->blocked; + siginitsetinv(¤t->blocked, sigs); + recalc_sigpending(current); + spin_unlock_irqrestore(¤t->sigmask_lock, irqflags); + + return old; +} #define __l_wait_event(wq, condition, info, ret) \ do { \ wait_queue_t __wait; \ - long __state; \ int __timed_out = 0; \ - init_waitqueue_entry(&__wait, current); \ + unsigned long irqflags; \ + sigset_t blocked; \ \ + init_waitqueue_entry(&__wait, current); \ add_wait_queue(&wq, &__wait); \ + \ + /* Block all signals (just the non-fatal ones if no timeout). */ \ if (info->lwi_signals && !info->lwi_timeout) \ - __state = TASK_INTERRUPTIBLE; \ + blocked = l_w_e_set_sigs(LUSTRE_FATAL_SIGS); \ else \ - __state = TASK_UNINTERRUPTIBLE; \ + blocked = l_w_e_set_sigs(0); \ + \ for (;;) { \ - set_current_state(__state); \ + set_current_state(TASK_INTERRUPTIBLE); \ if (condition) \ break; \ - if (__state == TASK_INTERRUPTIBLE && l_killable_pending(current)) {\ + if (signal_pending(current)) { \ if (info->lwi_on_signal) \ info->lwi_on_signal(info->lwi_cb_data); \ ret = -EINTR; \ @@ -618,21 +631,19 @@ do { \ break; \ } \ /* We'll take signals after a timeout. */ \ - if (info->lwi_signals) { \ - __state = TASK_INTERRUPTIBLE; \ - /* Check for a pending interrupt. */ \ - if (info->lwi_signals && l_killable_pending(current)) {\ - if (info->lwi_on_signal) \ - info->lwi_on_signal(info->lwi_cb_data); \ - ret = -EINTR; \ - break; \ - } \ - } \ + if (info->lwi_signals) \ + (void)l_w_e_set_sigs(LUSTRE_FATAL_SIGS); \ } \ } else { \ schedule(); \ } \ } \ + \ + spin_lock_irqsave(¤t->sigmask_lock, irqflags); \ + current->blocked = blocked; \ + recalc_sigpending(current); \ + spin_unlock_irqrestore(¤t->sigmask_lock, irqflags); \ + \ current->state = TASK_RUNNING; \ remove_wait_queue(&wq, &__wait); \ } while(0) @@ -645,5 +656,6 @@ do { \ __l_wait_event(wq, condition, __info, __ret); \ __ret; \ }) +#endif /* __KERNEL__ */ #endif /* _LUSTRE_LIB_H */ diff --git a/lustre/include/linux/lustre_lite.h b/lustre/include/linux/lustre_lite.h index 0c29c79..9657f24 100644 --- a/lustre/include/linux/lustre_lite.h +++ b/lustre/include/linux/lustre_lite.h @@ -56,11 +56,39 @@ struct ll_inode_info { struct lov_stripe_md *lli_smd; char *lli_symlink_name; struct semaphore lli_open_sem; + atomic_t lli_open_count; /* see ll_file_release */ + /* + * the VALID flag and valid_sem are temporary measures to serialize + * the manual getattrs that we're doing at lock acquisition. in + * the future the OST will always return its notion of the file + * size with the granted locks. + */ + unsigned long lli_flags; +#define LLI_F_DID_GETATTR 0 + struct semaphore lli_getattr_sem; + struct list_head lli_read_extents; + spinlock_t lli_read_extent_lock; + #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0)) struct inode lli_vfs_inode; #endif }; +/* + * this lets ll_file_read tell ll_readpages how far ahead it can read + * and still be covered by ll_file_read's lock. 2.5 won't need this, but + * we have the other problem of other readpage callers making sure that + * they're covered by a lock.. + */ +struct ll_read_extent { + struct list_head re_lli_item; + struct task_struct *re_task; + struct ldlm_extent re_extent; +}; + +int ll_check_dirty( struct super_block *sb ); +int ll_batch_writepage( struct inode *inode, struct page *page ); + /* interpet return codes from intent lookup */ #define LL_LOOKUP_POSITIVE 1 #define LL_LOOKUP_NEGATIVE 2 @@ -246,11 +274,15 @@ extern struct inode_operations ll_special_inode_operations; struct ldlm_lock; int ll_lock_callback(struct ldlm_lock *, struct ldlm_lock_desc *, void *data, int flag); -int ll_size_lock(struct inode *, struct lov_stripe_md *, obd_off start, - int mode, struct lustre_handle *); -int ll_size_unlock(struct inode *, struct lov_stripe_md *, int mode, - struct lustre_handle *); -int ll_file_size(struct inode *inode, struct lov_stripe_md *md, char *ostdata); +int ll_extent_lock_no_validate(struct ll_file_data *fd, struct inode *inode, + struct lov_stripe_md *lsm, int mode, + struct ldlm_extent *extent, struct lustre_handle *lockh); +int ll_extent_lock(struct ll_file_data *fd, struct inode *inode, + struct lov_stripe_md *lsm, int mode, + struct ldlm_extent *extent, struct lustre_handle *lockh); +int ll_extent_unlock(struct ll_file_data *fd, struct inode *inode, + struct lov_stripe_md *lsm, int mode, + struct lustre_handle *lockh); int ll_create_objects(struct super_block *sb, obd_id id, uid_t uid, gid_t gid, struct lov_stripe_md **lsmp); diff --git a/lustre/include/linux/lustre_mds.h b/lustre/include/linux/lustre_mds.h index 0a881b1..c951637 100644 --- a/lustre/include/linux/lustre_mds.h +++ b/lustre/include/linux/lustre_mds.h @@ -138,7 +138,7 @@ struct mds_client_data { __u64 mcd_last_xid; /* xid for the last transaction */ __u32 mcd_last_result; /* result from last RPC */ __u32 mcd_last_data; /* per-op data (disposition for open &c.) */ - __u8 padding[MDS_LR_SIZE - 58]; + __u8 padding[MDS_LR_SIZE - 74]; }; /* In-memory access to client data from MDS struct */ diff --git a/lustre/include/linux/lustre_net.h b/lustre/include/linux/lustre_net.h index 8c50212..6966424 100644 --- a/lustre/include/linux/lustre_net.h +++ b/lustre/include/linux/lustre_net.h @@ -177,6 +177,7 @@ struct ptlrpc_request { int rq_reqlen; struct lustre_msg *rq_reqmsg; + int rq_timeout; int rq_replen; struct lustre_msg *rq_repmsg; __u64 rq_transno; @@ -368,7 +369,8 @@ int ptlrpc_abort_bulk(struct ptlrpc_bulk_desc *bulk); struct obd_brw_set *obd_brw_set_new(void); void obd_brw_set_add(struct obd_brw_set *, struct ptlrpc_bulk_desc *); void obd_brw_set_del(struct ptlrpc_bulk_desc *); -void obd_brw_set_free(struct obd_brw_set *); +void obd_brw_set_decref(struct obd_brw_set *set); +void obd_brw_set_addref(struct obd_brw_set *set); int ptlrpc_reply(struct ptlrpc_service *svc, struct ptlrpc_request *req); int ptlrpc_error(struct ptlrpc_service *svc, struct ptlrpc_request *req); diff --git a/lustre/include/linux/obd_class.h b/lustre/include/linux/obd_class.h index f626bab..b571b06 100644 --- a/lustre/include/linux/obd_class.h +++ b/lustre/include/linux/obd_class.h @@ -34,7 +34,7 @@ #include #include #include -#endif +#endif #include #include @@ -770,6 +770,33 @@ static inline void obdo_from_inode(struct obdo *dst, struct inode *src, dst->o_valid |= (valid & ~OBD_MD_FLID); } +static inline void obdo_refresh_inode(struct inode *dst, struct obdo *src, + obd_flag valid) +{ + valid &= src->o_valid; + +#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0)) + if (valid & OBD_MD_FLATIME && src->o_atime > dst->i_atime) + dst->i_atime = src->o_atime; + if (valid & OBD_MD_FLMTIME && src->o_mtime > dst->i_mtime) + dst->i_mtime = src->o_mtime; + if (valid & OBD_MD_FLCTIME && src->o_ctime > dst->i_ctime) + dst->i_ctime = src->o_ctime; +#else + if (valid & OBD_MD_FLATIME && src->o_atime > dst->i_atime.tv_sec) + dst->i_atime.tv_sec = src->o_atime; + if (valid & OBD_MD_FLMTIME && src->o_mtime > dst->i_mtime.tv_sec) + dst->i_mtime.tv_sec = src->o_mtime; + if (valid & OBD_MD_FLCTIME && src->o_ctime > dst->i_ctime.tv_sec) + dst->i_ctime.tv_sec = src->o_ctime; +#endif + if (valid & OBD_MD_FLSIZE && src->o_size > dst->i_size) + dst->i_size = src->o_size; + /* allocation of space */ + if (valid & OBD_MD_FLBLOCKS && src->o_blocks > dst->i_blocks) + dst->i_blocks = src->o_blocks; +} + static inline void obdo_to_inode(struct inode *dst, struct obdo *src, obd_flag valid) { diff --git a/lustre/include/linux/obd_ost.h b/lustre/include/linux/obd_ost.h index f8d1486..9ef7052 100644 --- a/lustre/include/linux/obd_ost.h +++ b/lustre/include/linux/obd_ost.h @@ -35,10 +35,10 @@ #define LUSTRE_SANOST_NAME "sanost" /* ost/ost_pack.c */ -void ost_pack_niobuf(void **tmp, __u64 offset, __u32 len, __u32 flags, - __u32 xid); -void ost_unpack_niobuf(void **tmp, struct niobuf_remote **nbp); -void ost_pack_ioo(struct obd_ioobj **ioop, struct lov_stripe_md *oa,int bufcnt); -void ost_unpack_ioo(struct obd_ioobj **tmp, struct obd_ioobj **ioop); +void ost_pack_niobuf(struct niobuf_remote *nb, __u64 offset, __u32 len, + __u32 flags, __u32 xid); +void ost_unpack_niobuf(struct niobuf_remote *dst, struct niobuf_remote *src); +void ost_pack_ioo(struct obd_ioobj *ioo, struct lov_stripe_md *lsm, int bufcnt); +void ost_unpack_ioo(struct obd_ioobj *dst, struct obd_ioobj *src); #endif diff --git a/lustre/include/linux/obd_ptlbd.h b/lustre/include/linux/obd_ptlbd.h index b4f9fe9..3af66b5 100644 --- a/lustre/include/linux/obd_ptlbd.h +++ b/lustre/include/linux/obd_ptlbd.h @@ -24,7 +24,7 @@ extern void ptlbd_sv_exit(void); extern void ptlbd_blk_register(struct ptlbd_obd *ptlbd); extern int ptlbd_send_req(struct ptlbd_obd *, ptlbd_cmd_t cmd, - struct buffer_head *); + struct request *); extern int ptlbd_parse_req(struct ptlrpc_request *req); #endif diff --git a/lustre/include/linux/obd_support.h b/lustre/include/linux/obd_support.h index a4d676d..85e577a 100644 --- a/lustre/include/linux/obd_support.h +++ b/lustre/include/linux/obd_support.h @@ -124,13 +124,21 @@ extern unsigned long obd_sync_filter; ((obd_fail_loc & (OBD_FAILED | OBD_FAIL_ONCE))!= \ (OBD_FAILED | OBD_FAIL_ONCE))) -#define OBD_FAIL_RETURN(id, ret) \ -do { \ +#define OBD_FAIL_CHECK_ONCE(id) \ +({ int _ret_ = 0; \ if (OBD_FAIL_CHECK(id)) { \ - CERROR("obd_fail_loc=%x, fail operation rc=%d\n", id, ret); \ + CERROR("obd_fail_loc=%x\n", id); \ obd_fail_loc |= OBD_FAILED; \ if ((id) & OBD_FAIL_ONCE) \ obd_fail_loc |= OBD_FAIL_ONCE; \ + _ret_ = 1; \ + } \ + _ret_; \ +}) + +#define OBD_FAIL_RETURN(id, ret) \ +do { \ + if (OBD_FAIL_CHECK_ONCE(id)) { \ RETURN(ret); \ } \ } while(0) diff --git a/lustre/kernel_patches/patches/iod-stock-24-exports_hp.patch b/lustre/kernel_patches/patches/iod-stock-24-exports_hp.patch new file mode 100644 index 0000000..669b44d --- /dev/null +++ b/lustre/kernel_patches/patches/iod-stock-24-exports_hp.patch @@ -0,0 +1,41 @@ +--- linux-2.4.19-hp2_pnnl4_Lv13/fs/inode.c.iod-export 2003-02-27 14:28:04.000000000 -0800 ++++ linux-2.4.19-hp2_pnnl4_Lv13/fs/inode.c 2003-03-03 13:54:59.000000000 -0800 +@@ -5,6 +5,7 @@ + */ + + #include ++#include + #include + #include + #include +@@ -66,7 +67,8 @@ + * NOTE! You also have to own the lock if you change + * the i_state of an inode while it is in use.. + */ +-static spinlock_t inode_lock = SPIN_LOCK_UNLOCKED; ++spinlock_t inode_lock = SPIN_LOCK_UNLOCKED; ++EXPORT_SYMBOL(inode_lock); + + /* + * Statistics gathering.. +--- linux-2.4.19-hp2_pnnl4_Lv13/fs/Makefile.iod-export 2003-02-27 14:28:01.000000000 -0800 ++++ linux-2.4.19-hp2_pnnl4_Lv13/fs/Makefile 2003-03-03 13:56:11.000000000 -0800 +@@ -7,7 +7,7 @@ + + O_TARGET := fs.o + +-export-objs := filesystems.o open.o dcache.o buffer.o dquot.o ++export-objs := filesystems.o open.o dcache.o buffer.o dquot.o inode.o + mod-subdirs := nls xfs + + obj-y := open.o read_write.o devices.o file_table.o buffer.o \ +--- linux-2.4.19-hp2_pnnl4_Lv13/mm/page_alloc.c.iod-export 2003-02-27 14:28:01.000000000 -0800 ++++ linux-2.4.19-hp2_pnnl4_Lv13/mm/page_alloc.c 2003-03-03 13:54:59.000000000 -0800 +@@ -28,6 +28,7 @@ + LIST_HEAD(inactive_list); + LIST_HEAD(active_list); + pg_data_t *pgdat_list; ++EXPORT_SYMBOL(pgdat_list); + + /* Used to look up the address of the struct zone encoded in page->zone */ + zone_t *zone_table[MAX_NR_ZONES*MAX_NR_NODES]; diff --git a/lustre/kernel_patches/series/hp-pnnl b/lustre/kernel_patches/series/hp-pnnl index 6723ab6..bf276fb 100644 --- a/lustre/kernel_patches/series/hp-pnnl +++ b/lustre/kernel_patches/series/hp-pnnl @@ -5,3 +5,4 @@ jbd-transno-cb.patch lustre_version.patch vfs_intent_hp.patch invalidate_show.patch +iod-stock-24-exports_hp.patch diff --git a/lustre/kernel_patches/series/rh-8.0 b/lustre/kernel_patches/series/rh-8.0 deleted file mode 100644 index 2ba39f5..0000000 --- a/lustre/kernel_patches/series/rh-8.0 +++ /dev/null @@ -1,9 +0,0 @@ -dev_read_only.patch -exports.patch -kmem_cache_validate.patch -lustre_version.patch -uml_check_get_page.patch -uml_no_panic.patch -vfs_intent.patch -uml_compile_fixes.patch -invalidate_show.patch diff --git a/lustre/ldlm/ldlm_lockd.c b/lustre/ldlm/ldlm_lockd.c index c1d3182..dafcb6e 100644 --- a/lustre/ldlm/ldlm_lockd.c +++ b/lustre/ldlm/ldlm_lockd.c @@ -25,17 +25,16 @@ #define DEBUG_SUBSYSTEM S_LDLM #ifdef __KERNEL__ -#include -#include -#include -#else -#include +# include +# include +# include +#else +# include #endif #include #include - extern kmem_cache_t *ldlm_resource_slab; extern kmem_cache_t *ldlm_lock_slab; extern struct lustre_lock ldlm_handle_lock; @@ -189,6 +188,7 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock, l_unlock(&lock->l_resource->lr_namespace->ns_lock); req->rq_level = LUSTRE_CONN_RECOVD; + req->rq_timeout = 2; rc = ptlrpc_queue_wait(req); if (rc == -ETIMEDOUT || rc == -EINTR) { ldlm_del_waiting_lock(lock); @@ -236,6 +236,7 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data) req->rq_replen = lustre_msg_size(0, NULL); req->rq_level = LUSTRE_CONN_RECOVD; + req->rq_timeout = 2; rc = ptlrpc_queue_wait(req); if (rc == -ETIMEDOUT || rc == -EINTR) { ldlm_del_waiting_lock(lock); @@ -434,28 +435,21 @@ int ldlm_handle_cancel(struct ptlrpc_request *req) RETURN(0); } -struct ldlm_lock *ldlm_handle2lock_ns(struct ldlm_namespace *ns, - struct lustre_handle *handle); - -static int ldlm_handle_bl_callback(struct ptlrpc_request *req, - struct ldlm_namespace *ns) +static void ldlm_handle_bl_callback(struct ptlrpc_request *req, + struct ldlm_namespace *ns, + struct ldlm_request *dlm_req, + struct ldlm_lock *lock) { - struct ldlm_request *dlm_req; - struct ldlm_lock *lock; int do_ast; ENTRY; - OBD_FAIL_RETURN(OBD_FAIL_OSC_LOCK_BL_AST, 0); - - dlm_req = lustre_msg_buf(req->rq_reqmsg, 0); - - lock = ldlm_handle2lock_ns(ns, &dlm_req->lock_handle1); - if (!lock) { - CDEBUG(D_INFO, "blocking callback on lock "LPX64 - " - lock disappeared\n", dlm_req->lock_handle1.cookie); - RETURN(-EINVAL); - } - + /* Try to narrow down this damn iozone bug */ + if (lock->l_resource == NULL) + CERROR("lock %p resource NULL\n", lock); + if (lock->l_resource->lr_type != LDLM_EXTENT) + if (lock->l_resource->lr_namespace != ns) + CERROR("lock %p namespace %p != passed ns %p\n", lock, + lock->l_resource->lr_namespace, ns); LDLM_DEBUG(lock, "client blocking AST callback handler START"); l_lock(&ns->ns_lock); @@ -476,28 +470,17 @@ static int ldlm_handle_bl_callback(struct ptlrpc_request *req, LDLM_DEBUG(lock, "client blocking callback handler END"); LDLM_LOCK_PUT(lock); - RETURN(0); + EXIT; } -static int ldlm_handle_cp_callback(struct ptlrpc_request *req, - struct ldlm_namespace *ns) +static void ldlm_handle_cp_callback(struct ptlrpc_request *req, + struct ldlm_namespace *ns, + struct ldlm_request *dlm_req, + struct ldlm_lock *lock) { - struct list_head ast_list = LIST_HEAD_INIT(ast_list); - struct ldlm_request *dlm_req; - struct ldlm_lock *lock; + LIST_HEAD(ast_list); ENTRY; - OBD_FAIL_RETURN(OBD_FAIL_OSC_LOCK_CP_AST, 0); - - dlm_req = lustre_msg_buf(req->rq_reqmsg, 0); - - lock = ldlm_handle2lock_ns(ns, &dlm_req->lock_handle1); - if (!lock) { - CERROR("completion callback on lock "LPX64" - lock " - "disappeared\n", dlm_req->lock_handle1.cookie); - RETURN(-EINVAL); - } - LDLM_DEBUG(lock, "client completion callback handler START"); l_lock(&ns->ns_lock); @@ -530,12 +513,24 @@ static int ldlm_handle_cp_callback(struct ptlrpc_request *req, LDLM_DEBUG_NOLOCK("client completion callback handler END (lock %p)", lock); - RETURN(0); + EXIT; +} + +static int ldlm_callback_reply(struct ptlrpc_request *req, int rc) +{ + req->rq_status = rc; + rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, + &req->rq_repmsg); + if (rc) + return rc; + return ptlrpc_reply(req->rq_svc, req); } static int ldlm_callback_handler(struct ptlrpc_request *req) { struct ldlm_namespace *ns; + struct ldlm_request *dlm_req; + struct ldlm_lock *lock; int rc; ENTRY; @@ -556,8 +551,17 @@ static int ldlm_callback_handler(struct ptlrpc_request *req) dlm_req = lustre_msg_buf(req->rq_reqmsg, 0); CERROR("--> lock addr: "LPX64", cookie: "LPX64"\n", dlm_req->lock_handle1.addr,dlm_req->lock_handle1.cookie); - rc = -ENOTCONN; - goto out; + ldlm_callback_reply(req, -ENOTCONN); + RETURN(0); + } + + if (req->rq_reqmsg->opc == LDLM_BL_CALLBACK) { + OBD_FAIL_RETURN(OBD_FAIL_LDLM_BL_CALLBACK, 0); + } else if (req->rq_reqmsg->opc == LDLM_CP_CALLBACK) { + OBD_FAIL_RETURN(OBD_FAIL_LDLM_CP_CALLBACK, 0); + } else { + ldlm_callback_reply(req, -EIO); + RETURN(0); } LASSERT(req->rq_export != NULL); @@ -565,27 +569,30 @@ static int ldlm_callback_handler(struct ptlrpc_request *req) ns = req->rq_export->exp_obd->obd_namespace; LASSERT(ns != NULL); + dlm_req = lustre_msg_buf(req->rq_reqmsg, 0); + lock = ldlm_handle2lock_ns(ns, &dlm_req->lock_handle1); + if (!lock) { + CDEBUG(D_INODE, "callback on lock "LPX64" - lock disappeared\n", + dlm_req->lock_handle1.cookie); + ldlm_callback_reply(req, -EINVAL); + RETURN(0); + } + + /* we want the ost thread to get this reply so that it can respond + * to ost requests (write cache writeback) that might be triggered + * in the callback */ + ldlm_callback_reply(req, 0); + switch (req->rq_reqmsg->opc) { case LDLM_BL_CALLBACK: CDEBUG(D_INODE, "blocking ast\n"); - OBD_FAIL_RETURN(OBD_FAIL_LDLM_BL_CALLBACK, 0); - rc = ldlm_handle_bl_callback(req, ns); + ldlm_handle_bl_callback(req, ns, dlm_req, lock); break; case LDLM_CP_CALLBACK: CDEBUG(D_INODE, "completion ast\n"); - OBD_FAIL_RETURN(OBD_FAIL_LDLM_CP_CALLBACK, 0); - rc = ldlm_handle_cp_callback(req, ns); + ldlm_handle_cp_callback(req, ns, dlm_req, lock); break; - default: - CERROR("invalid opcode %d\n", req->rq_reqmsg->opc); - RETURN(-EINVAL); } - out: - req->rq_status = rc; - rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg); - if (rc) - RETURN(rc); - ptlrpc_reply(req->rq_svc, req); RETURN(0); } @@ -610,9 +617,7 @@ static int ldlm_cancel_handler(struct ptlrpc_request *req) req->rq_reqmsg->addr, req->rq_reqmsg->cookie); dlm_req = lustre_msg_buf(req->rq_reqmsg, 0); ldlm_lock_dump_handle(D_ERROR, &dlm_req->lock_handle1); - CERROR("--> ignoring this error as a temporary workaround! " - "beware!\n"); - //RETURN(-ENOTCONN); + RETURN(-ENOTCONN); } switch (req->rq_reqmsg->opc) { diff --git a/lustre/lib/obd_pack.c b/lustre/lib/obd_pack.c index c77b5b8..c76ff32 100644 --- a/lustre/lib/obd_pack.c +++ b/lustre/lib/obd_pack.c @@ -30,54 +30,35 @@ #include #include -void ost_pack_ioo(struct obd_ioobj **tmp, struct lov_stripe_md *lsm,int bufcnt) +void ost_pack_ioo(struct obd_ioobj *ioo, struct lov_stripe_md *lsm, int bufcnt) { - struct obd_ioobj *ioo = *tmp; - void *p = *tmp; - ioo->ioo_id = HTON__u64(lsm->lsm_object_id); ioo->ioo_gr = HTON__u64(0); ioo->ioo_type = HTON__u32(S_IFREG); ioo->ioo_bufcnt = HTON__u32(bufcnt); - *tmp = p + sizeof(*ioo); } -void ost_unpack_ioo(struct obd_ioobj **tmp, struct obd_ioobj **ioop) +void ost_unpack_ioo(struct obd_ioobj *dst, struct obd_ioobj *src) { - void *p = *tmp; - struct obd_ioobj *ioo = *tmp; - *ioop = *tmp; - - ioo->ioo_id = NTOH__u64(ioo->ioo_id); - ioo->ioo_gr = NTOH__u64(ioo->ioo_gr); - ioo->ioo_type = NTOH__u32(ioo->ioo_type); - ioo->ioo_bufcnt = NTOH__u32(ioo->ioo_bufcnt); - *tmp = p + sizeof(*ioo); + dst->ioo_id = NTOH__u64(src->ioo_id); + dst->ioo_gr = NTOH__u64(src->ioo_gr); + dst->ioo_type = NTOH__u32(src->ioo_type); + dst->ioo_bufcnt = NTOH__u32(src->ioo_bufcnt); } -void ost_pack_niobuf(void **tmp, __u64 offset, __u32 len, __u32 flags, - __u32 xid) +void ost_pack_niobuf(struct niobuf_remote *nb, __u64 offset, __u32 len, + __u32 flags, __u32 xid) { - struct niobuf_remote *nb = *tmp; - char *c = *tmp; - nb->offset = HTON__u64(offset); nb->len = HTON__u32(len); - nb->flags = HTON__u32(flags); nb->xid = HTON__u32(xid); - *tmp = c + sizeof(*nb); + nb->flags = HTON__u32(flags); } -void ost_unpack_niobuf(void **tmp, struct niobuf_remote **nbp) +void ost_unpack_niobuf(struct niobuf_remote *dst, struct niobuf_remote *src) { - char *c = *tmp; - struct niobuf_remote *nb = *tmp; - - *nbp = *tmp; - - nb->offset = NTOH__u64(nb->offset); - nb->len = NTOH__u32(nb->len); - nb->flags = NTOH__u32(nb->flags); - - *tmp = c + sizeof(*nb); + dst->offset = NTOH__u64(src->offset); + dst->len = NTOH__u32(src->len); + dst->xid = NTOH__u32(src->xid); + dst->flags = NTOH__u32(src->flags); } diff --git a/lustre/lib/target.c b/lustre/lib/target.c index 590ae4b..82f1164 100644 --- a/lustre/lib/target.c +++ b/lustre/lib/target.c @@ -195,7 +195,7 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler) dlmimp->imp_client = &export->exp_obd->obd_ldlm_client; dlmimp->imp_handle.addr = req->rq_reqmsg->addr; dlmimp->imp_handle.cookie = req->rq_reqmsg->cookie; - dlmimp->imp_obd = /* LDLM! */ NULL; + dlmimp->imp_obd = target; dlmimp->imp_recover = NULL; INIT_LIST_HEAD(&dlmimp->imp_replay_list); INIT_LIST_HEAD(&dlmimp->imp_sending_list); @@ -373,13 +373,14 @@ static void process_recovery_queue(struct obd_device *obd) struct ptlrpc_request, rq_list); if (req->rq_reqmsg->transno != obd->obd_next_recovery_transno) { + struct l_wait_info lwi = { 0 }; spin_unlock_bh(&obd->obd_processing_task_lock); CDEBUG(D_HA, "Waiting for transno "LPD64" (1st is " LPD64")\n", obd->obd_next_recovery_transno, req->rq_reqmsg->transno); - wait_event(obd->obd_next_transno_waitq, - check_for_next_transno(obd)); + l_wait_event(obd->obd_next_transno_waitq, + check_for_next_transno(obd), &lwi); spin_lock_bh(&obd->obd_processing_task_lock); if (obd->obd_flags & OBD_ABORT_RECOVERY) { target_abort_recovery(obd); diff --git a/lustre/liblustre/Makefile.am b/lustre/liblustre/Makefile.am index c761a22..665295e 100644 --- a/lustre/liblustre/Makefile.am +++ b/lustre/liblustre/Makefile.am @@ -4,14 +4,14 @@ DEFS= CFLAGS:=-g -O2 -I$(top_srcdir)/utils -I$(PORTALS)/include -I$(srcdir)/../include -Wall -L$(PORTALSLIB) KFLAGS:= -CPPFLAGS = $(HAVE_LIBREADLINE) -LIBS= +CPPFLAGS = $(HAVE_EFENCE) +LIBS = $(LIBEFENCE) LLIBS= ../lov/liblov.a ../obdecho/libobdecho.a ../osc/libosc.a ../ldlm/libldlm.a ../ptlrpc/libptlrpc.a ../obdclass/liblustreclass.a libtest_LDADD := $(LIBREADLINE) $(LLIBS) \ $(PORTALS)/user/procbridge/libprocbridge.a $(PORTALS)/user/tcpnal/libtcpnal.a \ - $(PORTALS)/user/util/libtcpnalutil.a $(PORTALS)/user/$(PORTALS)/api/libptlapi.a \ - $(PORTALS)/lib/libptllib.a -lptlctl -lpthread -lefence + $(PORTALS)/user/util/libtcpnalutil.a $(PORTALS)/api/libptlapi.a \ + $(PORTALS)/lib/libptllib.a -lptlctl -lpthread bin_PROGRAMS = libtest libtest_SOURCES = libtest.c diff --git a/lustre/liblustre/libtest.c b/lustre/liblustre/libtest.c index 2941398..c344198 100644 --- a/lustre/liblustre/libtest.c +++ b/lustre/liblustre/libtest.c @@ -10,7 +10,7 @@ #include #include #include -#include <../user/procbridge/procbridge.h> +#include ptl_handle_ni_t tcpnal_ni; diff --git a/lustre/llite/Makefile.am b/lustre/llite/Makefile.am index c536a0a..309088b 100644 --- a/lustre/llite/Makefile.am +++ b/lustre/llite/Makefile.am @@ -9,7 +9,7 @@ MODULE = llite modulefs_DATA = llite.o EXTRA_PROGRAMS = llite -llite_SOURCES = dcache.c commit_callback.c super.c rw.c super25.c +llite_SOURCES = dcache.c commit_callback.c super.c rw.c iod.c super25.c llite_SOURCES += file.c dir.c sysctl.c symlink.c llite_SOURCES += recover.c namei.c lproc_llite.c diff --git a/lustre/llite/commit_callback.c b/lustre/llite/commit_callback.c index 0e17c1a..f8b7e70 100644 --- a/lustre/llite/commit_callback.c +++ b/lustre/llite/commit_callback.c @@ -84,8 +84,9 @@ static int ll_commitcbd_main(void *arg) /* And now, loop forever on requests */ while (1) { - wait_event(sbi->ll_commitcbd_waitq, - ll_commitcbd_check_event(sbi)); + struct l_wait_info lwi = { 0 }; + l_wait_event(sbi->ll_commitcbd_waitq, + ll_commitcbd_check_event(sbi), &lwi); spin_lock(&sbi->ll_commitcbd_lock); if (sbi->ll_commitcbd_flags & LL_COMMITCBD_STOPPING) { @@ -112,6 +113,7 @@ static int ll_commitcbd_main(void *arg) int ll_commitcbd_setup(struct ll_sb_info *sbi) { int rc; + struct l_wait_info lwi = { 0 }; ENTRY; rc = kernel_thread(ll_commitcbd_main, (void *) sbi, @@ -120,18 +122,19 @@ int ll_commitcbd_setup(struct ll_sb_info *sbi) CERROR("cannot start thread\n"); RETURN(rc); } - wait_event(sbi->ll_commitcbd_ctl_waitq, - sbi->ll_commitcbd_flags & LL_COMMITCBD_RUNNING); + l_wait_event(sbi->ll_commitcbd_ctl_waitq, + sbi->ll_commitcbd_flags & LL_COMMITCBD_RUNNING, &lwi); RETURN(0); } int ll_commitcbd_cleanup(struct ll_sb_info *sbi) { + struct l_wait_info lwi = { 0 }; sbi->ll_commitcbd_flags = LL_COMMITCBD_STOPPING; wake_up(&sbi->ll_commitcbd_waitq); - wait_event(sbi->ll_commitcbd_ctl_waitq, - sbi->ll_commitcbd_flags & LL_COMMITCBD_STOPPED); + l_wait_event(sbi->ll_commitcbd_ctl_waitq, + sbi->ll_commitcbd_flags & LL_COMMITCBD_STOPPED, &lwi); RETURN(0); } diff --git a/lustre/llite/file.c b/lustre/llite/file.c index ff5d1d6..4c16e1c 100644 --- a/lustre/llite/file.c +++ b/lustre/llite/file.c @@ -124,6 +124,11 @@ static int ll_file_release(struct inode *inode, struct file *file) if (!fd) /* no process opened the file after an mcreate */ RETURN(rc = 0); + /* we might not be able to get a valid handle on this file + * again so we really want to flush our write cache.. */ + filemap_fdatasync(inode->i_mapping); + filemap_fdatawait(inode->i_mapping); + if (lsm != NULL) { memset(&oa, 0, sizeof(oa)); oa.o_id = lsm->lsm_object_id; @@ -182,17 +187,17 @@ static int ll_osc_open(struct lustre_handle *conn, struct inode *inode, RETURN(-ENOMEM); oa->o_id = lsm->lsm_object_id; oa->o_mode = S_IFREG; - oa->o_valid = (OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLSIZE | - OBD_MD_FLBLOCKS | OBD_MD_FLMTIME | OBD_MD_FLCTIME); + oa->o_valid = (OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLBLOCKS | + OBD_MD_FLMTIME | OBD_MD_FLCTIME); rc = obd_open(conn, oa, lsm, NULL); if (rc) GOTO(out, rc); file->f_flags &= ~O_LOV_DELAY_CREATE; - obdo_to_inode(inode, oa, (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS | - OBD_MD_FLMTIME | OBD_MD_FLCTIME)); + obdo_to_inode(inode, oa, OBD_MD_FLBLOCKS | OBD_MD_FLMTIME | + OBD_MD_FLCTIME); - if (oa->o_valid |= OBD_MD_FLHANDLE) + if (oa->o_valid & OBD_MD_FLHANDLE) memcpy(fd->fd_ostdata, obdo_handle(oa), FD_OSTDATA_SIZE); EXIT; @@ -355,85 +360,140 @@ static int ll_file_open(struct inode *inode, struct file *file) return rc; } -int ll_size_lock(struct inode *inode, struct lov_stripe_md *lsm, obd_off start, - int mode, struct lustre_handle *lockh) +/* + * really does the getattr on the inode and updates its fields + */ +int ll_inode_getattr(struct inode *inode, struct lov_stripe_md *lsm, + char *ostdata) +{ + struct ll_sb_info *sbi = ll_i2sbi(inode); + struct obdo oa; + int rc; + ENTRY; + + LASSERT(lsm); + LASSERT(sbi); + + memset(&oa, 0, sizeof oa); + oa.o_id = lsm->lsm_object_id; + oa.o_mode = S_IFREG; + oa.o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLSIZE | + OBD_MD_FLBLOCKS | OBD_MD_FLMTIME | OBD_MD_FLCTIME; + + if (ostdata != NULL) { + memcpy(&oa.o_inline, ostdata, FD_OSTDATA_SIZE); + oa.o_valid |= OBD_MD_FLHANDLE; + } + + rc = obd_getattr(&sbi->ll_osc_conn, &oa, lsm); + if (rc) + RETURN(rc); + + obdo_to_inode(inode, &oa, OBD_MD_FLSIZE | OBD_MD_FLBLOCKS | + OBD_MD_FLMTIME | OBD_MD_FLCTIME); + + CDEBUG(D_INODE, "objid "LPX64" size %Lu/%Lu\n", lsm->lsm_object_id, + inode->i_size, inode->i_size); + RETURN(0); +} + +/* + * we've acquired a lock and need to see if we should perform a getattr + * to update the file size that may have been updated by others that had + * their locks canceled. + */ +static int ll_size_validate(struct inode *inode, struct lov_stripe_md *lsm, + char *ostdata, struct ldlm_extent *extent) +{ + struct ll_inode_info *lli = ll_i2info(inode); + int rc = 0; + ENTRY; + + if (test_bit(LLI_F_DID_GETATTR, &lli->lli_flags)) + RETURN(0); + + down(&lli->lli_getattr_sem); + + if (!test_bit(LLI_F_DID_GETATTR, &lli->lli_flags)) { + rc = ll_inode_getattr(inode, lsm, ostdata); + if ( rc == 0 ) + set_bit(LLI_F_DID_GETATTR, &lli->lli_flags); + } + + up(&lli->lli_getattr_sem); + RETURN(rc); +} + +/* + * some callers, notably truncate, really don't want i_size set based + * on the the size returned by the getattr, or lock acquisition in + * the future. + */ +int ll_extent_lock_no_validate(struct ll_file_data *fd, struct inode *inode, + struct lov_stripe_md *lsm, + int mode, struct ldlm_extent *extent, + struct lustre_handle *lockh) { struct ll_sb_info *sbi = ll_i2sbi(inode); - struct ldlm_extent extent; int rc, flags = 0; ENTRY; + LASSERT(lockh->addr == 0 && lockh->cookie == 0); + /* XXX phil: can we do this? won't it screw the file size up? */ - if (sbi->ll_flags & LL_SBI_NOLCK) + if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) || + (sbi->ll_flags & LL_SBI_NOLCK)) RETURN(0); - extent.start = start; - extent.end = OBD_OBJECT_EOF; + CDEBUG(D_INFO, "Locking inode %lu, start "LPU64" end "LPU64"\n", + inode->i_ino, extent->start, extent->end); - rc = obd_enqueue(&sbi->ll_osc_conn, lsm, NULL, LDLM_EXTENT, &extent, + rc = obd_enqueue(&sbi->ll_osc_conn, lsm, NULL, LDLM_EXTENT, extent, sizeof(extent), mode, &flags, ll_lock_callback, inode, sizeof(*inode), lockh); + RETURN(rc); } - -int ll_size_unlock(struct inode *inode, struct lov_stripe_md *lsm, int mode, +/* + * this grabs a lock and manually implements behaviour that makes it look + * like the OST is returning the file size with each lock acquisition + */ +int ll_extent_lock(struct ll_file_data *fd, struct inode *inode, + struct lov_stripe_md *lsm, + int mode, struct ldlm_extent *extent, struct lustre_handle *lockh) { - struct ll_sb_info *sbi = ll_i2sbi(inode); int rc; ENTRY; - /* XXX phil: can we do this? won't it screw the file size up? */ - if (sbi->ll_flags & LL_SBI_NOLCK) - RETURN(0); + rc = ll_extent_lock_no_validate(fd, inode, lsm, mode, extent, lockh); - rc = obd_cancel(&sbi->ll_osc_conn, lsm, mode, lockh); - if (rc != ELDLM_OK) { - CERROR("lock cancel: %d\n", rc); - LBUG(); + if (rc == ELDLM_OK) { + rc = ll_size_validate(inode, lsm, fd ? fd->fd_ostdata : NULL, + extent); + if ( rc != 0 ) { + ll_extent_unlock(fd, inode, lsm, mode, lockh); + rc = ELDLM_GETATTR_ERROR; + } } RETURN(rc); } -/* This function is solely "sampling" the file size, and does not explicit - * locking on the size itself (see ll_size_lock() and ll_size_unlock()). - * - * XXX We need to optimize away the obd_getattr for decent performance here, - * by checking if we already have the size lock and considering our size - * authoritative in that case. In order to do that either the act of - * getting the size lock includes retrieving the file size, or the client - * keeps an atomic flag in the inode which indicates whether the size - * has been updated (see bug 280). - */ -int ll_file_size(struct inode *inode, struct lov_stripe_md *lsm, char *ostdata) +int ll_extent_unlock(struct ll_file_data *fd, struct inode *inode, + struct lov_stripe_md *lsm, int mode, + struct lustre_handle *lockh) { struct ll_sb_info *sbi = ll_i2sbi(inode); - struct obdo oa; int rc; ENTRY; - LASSERT(lsm); - LASSERT(sbi); - - memset(&oa, 0, sizeof oa); - oa.o_id = lsm->lsm_object_id; - oa.o_mode = S_IFREG; - oa.o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLSIZE | - OBD_MD_FLBLOCKS | OBD_MD_FLMTIME | OBD_MD_FLCTIME; - - if (ostdata != NULL) { - memcpy(&oa.o_inline, ostdata, FD_OSTDATA_SIZE); - oa.o_valid |= OBD_MD_FLHANDLE; - } + /* XXX phil: can we do this? won't it screw the file size up? */ + if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) || + (sbi->ll_flags & LL_SBI_NOLCK)) + RETURN(0); - rc = obd_getattr(&sbi->ll_osc_conn, &oa, lsm); - if (!rc) { - obdo_to_inode(inode, &oa, OBD_MD_FLSIZE | OBD_MD_FLBLOCKS | - OBD_MD_FLMTIME | OBD_MD_FLCTIME); - CDEBUG(D_INODE, "objid "LPX64" size %Lu/%Lx\n", - lsm->lsm_object_id, inode->i_size, inode->i_size); - } + rc = obd_cancel(&sbi->ll_osc_conn, lsm, mode, lockh); RETURN(rc); } @@ -481,6 +541,7 @@ int ll_lock_callback(struct ldlm_lock *lock, struct ldlm_lock_desc *new, void *data, int flag) { struct inode *inode = data; + struct ll_inode_info *lli = ll_i2info(inode); struct lustre_handle lockh = { 0, 0 }; int rc; ENTRY; @@ -497,11 +558,15 @@ int ll_lock_callback(struct ldlm_lock *lock, struct ldlm_lock_desc *new, CERROR("ldlm_cli_cancel failed: %d\n", rc); break; case LDLM_CB_CANCELING: + /* FIXME: we could be given 'canceling intents' so that we + * could know to write-back or simply throw away the pages + * based on if the cancel comes from a desire to, say, + * read or truncate.. */ CDEBUG(D_INODE, "invalidating obdo/inode %lu\n", inode->i_ino); - /* FIXME: do something better than throwing away everything */ - //down(&inode->i_sem); - ll_invalidate_inode_pages(inode); - //up(&inode->i_sem); + filemap_fdatasync(inode->i_mapping); + filemap_fdatawait(inode->i_mapping); + clear_bit(LLI_F_DID_GETATTR, &lli->lli_flags); + truncate_inode_pages(inode->i_mapping, 0); break; default: LBUG(); @@ -515,57 +580,49 @@ static ssize_t ll_file_read(struct file *filp, char *buf, size_t count, { struct ll_file_data *fd = filp->private_data; struct inode *inode = filp->f_dentry->d_inode; - struct ll_sb_info *sbi = ll_i2sbi(inode); + struct ll_inode_info *lli = ll_i2info(inode); + struct lov_stripe_md *lsm = lli->lli_smd; struct lustre_handle lockh = { 0, 0 }; - struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd; - int flags = 0; + struct ll_read_extent rextent; ldlm_error_t err; ssize_t retval; ENTRY; - CDEBUG(D_VFSTRACE, "VFS Op\n"); - if (!(fd->fd_flags & LL_FILE_IGNORE_LOCK) && - !(sbi->ll_flags & LL_SBI_NOLCK)) { - struct ldlm_extent extent; - extent.start = *ppos; - extent.end = *ppos + count - 1; - CDEBUG(D_INFO, "Locking inode %lu, start "LPU64" end "LPU64"\n", - inode->i_ino, extent.start, extent.end); - - err = obd_enqueue(&sbi->ll_osc_conn, lsm, NULL, LDLM_EXTENT, - &extent, sizeof(extent), LCK_PR, &flags, - ll_lock_callback, inode, sizeof(*inode), - &lockh); - if (err != ELDLM_OK) { - CERROR("lock enqueue: err: %d\n", err); - RETURN(err); - } - } - /* If we don't refresh the file size, generic_file_read may not even - * call ll_readpage */ - retval = ll_file_size(inode, lsm, fd->fd_ostdata); - if (retval < 0) { - CERROR("ll_file_size: "LPSZ"\n", retval); + /* "If nbyte is 0, read() will return 0 and have no other results." + * -- Single Unix Spec */ + if (count == 0) + RETURN(0); + + rextent.re_extent.start = *ppos; + rextent.re_extent.end = *ppos + count - 1; + + err = ll_extent_lock(fd, inode, lsm, + LCK_PR, &rextent.re_extent, &lockh); + if (err != ELDLM_OK && err != ELDLM_LOCK_MATCHED) { + retval = -ENOLCK; RETURN(retval); } + /* XXX tell ll_readpage what pages have a PR lock.. */ + rextent.re_task = current; + spin_lock(&lli->lli_read_extent_lock); + list_add(&rextent.re_lli_item, &lli->lli_read_extents); + spin_unlock(&lli->lli_read_extent_lock); + CDEBUG(D_INFO, "Reading inode %lu, "LPSZ" bytes, offset %Ld\n", inode->i_ino, count, *ppos); retval = generic_file_read(filp, buf, count, ppos); + spin_lock(&lli->lli_read_extent_lock); + list_del(&rextent.re_lli_item); + spin_unlock(&lli->lli_read_extent_lock); + if (retval > 0) ll_update_atime(inode); - if (!(fd->fd_flags & LL_FILE_IGNORE_LOCK) && - !(sbi->ll_flags & LL_SBI_NOLCK)) { - err = obd_cancel(&sbi->ll_osc_conn, lsm, LCK_PR, &lockh); - if (err != ELDLM_OK) { - CERROR("lock cancel: err: %d\n", err); - retval = err; - } - } - + /* XXX errors? */ + ll_extent_unlock(fd, inode, lsm, LCK_PR, &lockh); RETURN(retval); } @@ -577,71 +634,43 @@ ll_file_write(struct file *file, const char *buf, size_t count, loff_t *ppos) { struct ll_file_data *fd = file->private_data; struct inode *inode = file->f_dentry->d_inode; - struct ll_sb_info *sbi = ll_i2sbi(inode); - struct lustre_handle lockh = { 0, 0 }, eof_lockh = { 0, 0 }; + struct lustre_handle lockh = { 0, 0 }; struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd; - int flags = 0; + struct ldlm_extent extent; ldlm_error_t err; ssize_t retval; ENTRY; /* POSIX, but surprised the VFS doesn't check this already */ if (count == 0) - return 0; + RETURN(0); CDEBUG(D_VFSTRACE, "VFS Op\n"); if (!S_ISBLK(inode->i_mode) && file->f_flags & O_APPEND) { - err = ll_size_lock(inode, lsm, 0, LCK_PW, &eof_lockh); - if (err) - RETURN(err); - - /* Get size here so we know extent to enqueue write lock on. */ - retval = ll_file_size(inode, lsm, fd->fd_ostdata); - if (retval) - GOTO(out_eof, retval); - - *ppos = inode->i_size; - } - - if (!(fd->fd_flags & LL_FILE_IGNORE_LOCK) && - !(sbi->ll_flags & LL_SBI_NOLCK)) { - struct ldlm_extent extent; + extent.start = 0; + extent.end = OBD_OBJECT_EOF; + } else { extent.start = *ppos; extent.end = *ppos + count - 1; - CDEBUG(D_INFO, "Locking inode %lu, start "LPU64" end "LPU64"\n", - inode->i_ino, extent.start, extent.end); - - err = obd_enqueue(&sbi->ll_osc_conn, lsm, NULL, LDLM_EXTENT, - &extent, sizeof(extent), LCK_PW, &flags, - ll_lock_callback, inode, sizeof(*inode), - &lockh); - if (err != ELDLM_OK) { - CERROR("lock enqueue: err: %d\n", err); - GOTO(out_eof, retval = err); - } } + err = ll_extent_lock(fd, inode, lsm, LCK_PW, &extent, &lockh); + if (err != ELDLM_OK && err != ELDLM_LOCK_MATCHED) { + retval = -ENOLCK; + RETURN(retval); + } + + if (!S_ISBLK(inode->i_mode) && file->f_flags & O_APPEND) + *ppos = inode->i_size; + CDEBUG(D_INFO, "Writing inode %lu, "LPSZ" bytes, offset %Lu\n", inode->i_ino, count, *ppos); retval = generic_file_write(file, buf, count, ppos); - if (!(fd->fd_flags & LL_FILE_IGNORE_LOCK) && - !(sbi->ll_flags & LL_SBI_NOLCK)) { - err = obd_cancel(&sbi->ll_osc_conn, lsm, LCK_PW, &lockh); - if (err != ELDLM_OK) - CERROR("lock cancel: err: %d\n", err); - } - - EXIT; - out_eof: - if (!S_ISBLK(inode->i_mode) && file->f_flags & O_APPEND) { - err = ll_size_unlock(inode, lsm, LCK_PW, &eof_lockh); - if (err) - CERROR("ll_size_unlock: %d\n", err); - } - - return retval; + /* XXX errors? */ + ll_extent_unlock(fd, inode, lsm, LCK_PW, &lockh); + RETURN(retval); } static int ll_lov_setstripe(struct inode *inode, struct file *file, @@ -749,25 +778,27 @@ int ll_file_ioctl(struct inode *inode, struct file *file, unsigned int cmd, loff_t ll_file_seek(struct file *file, loff_t offset, int origin) { struct inode *inode = file->f_dentry->d_inode; - long long retval; + struct ll_file_data *fd = file->private_data; + struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd; + struct lustre_handle lockh = {0, 0}; + loff_t retval; ENTRY; CDEBUG(D_VFSTRACE, "VFS Op\n"); - switch (origin) { - case 2: { - struct ll_inode_info *lli = ll_i2info(inode); - struct ll_file_data *fd = file->private_data; - - retval = ll_file_size(inode, lli->lli_smd, fd->fd_ostdata); - if (retval) + if (origin == 2) { /* SEEK_END */ + ldlm_error_t err; + struct ldlm_extent extent = {0, OBD_OBJECT_EOF}; + err = ll_extent_lock(fd, inode, lsm, LCK_PR, &extent, &lockh); + if (err != ELDLM_OK && err != ELDLM_LOCK_MATCHED) { + retval = -ENOLCK; RETURN(retval); + } offset += inode->i_size; - break; - } - case 1: + } else if (origin == 1) { /* SEEK_CUR */ offset += file->f_pos; } + retval = -EINVAL; if (offset >= 0 && offset <= inode->i_sb->s_maxbytes) { if (offset != file->f_pos) { @@ -779,14 +810,28 @@ loff_t ll_file_seek(struct file *file, loff_t offset, int origin) } retval = offset; } + + if (origin == 2) + ll_extent_unlock(fd, inode, lsm, LCK_PR, &lockh); RETURN(retval); } -/* XXX this does not need to do anything for data, it _does_ need to - call setattr */ int ll_fsync(struct file *file, struct dentry *dentry, int data) { - return 0; + int ret; + ENTRY; + + /* + * filemap_fdata{sync,wait} are also called at PW lock cancelation so + * we know that they can only find data to writeback here if we are + * still holding the PW lock that covered the dirty pages. XXX we + * should probably get a reference on it, though, just to be clear. + */ + ret = filemap_fdatasync(dentry->d_inode->i_mapping); + if ( ret == 0 ) + ret = filemap_fdatawait(dentry->d_inode->i_mapping); + + RETURN(ret); } int ll_inode_revalidate(struct dentry *dentry) @@ -848,10 +893,24 @@ int ll_inode_revalidate(struct dentry *dentry) if (!lsm) /* object not yet allocated, don't validate size */ RETURN(0); - /* XXX this should probably become an unconditional obd_getattr() - * so that we update the blocks count and mtime from the OST too. + /* + * unfortunately stat comes in through revalidate and we don't + * differentiate this use from initial instantiation. we're + * also being wildly conservative and flushing write caches + * so that stat really returns the proper size. */ - RETURN(ll_file_size(inode, lsm, NULL)); + { + struct ldlm_extent extent = {0, OBD_OBJECT_EOF}; + struct lustre_handle lockh = {0, 0}; + ldlm_error_t err; + + err = ll_extent_lock(NULL, inode, lsm, LCK_PR, &extent, &lockh); + if (err != ELDLM_OK && err != ELDLM_LOCK_MATCHED ) + RETURN(-abs(err)); /* XXX can't be right */ + + ll_extent_unlock(NULL, inode, lsm, LCK_PR, &lockh); + } + RETURN(0); } #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0)) @@ -889,7 +948,7 @@ struct file_operations ll_file_operations = { release: ll_file_release, mmap: generic_file_mmap, llseek: ll_file_seek, - fsync: NULL + fsync: ll_fsync, }; struct inode_operations ll_file_inode_operations = { diff --git a/lustre/llite/iod.c b/lustre/llite/iod.c new file mode 100644 index 0000000..3a045f4 --- /dev/null +++ b/lustre/llite/iod.c @@ -0,0 +1,415 @@ +/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- + * vim:expandtab:shiftwidth=8:tabstop=8: + * + * Copyright (c) 2002, 2003 Cluster File Systems, Inc. + * + * This file is part of Lustre, http://www.lustre.org. + * + * Lustre is free software; you can redistribute it and/or + * modify it under the terms of version 2 of the GNU General Public + * License as published by the Free Software Foundation. + * + * Lustre is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Lustre; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + * + * Copyright (C) 2002, 2003 Cluster File Systems, Inc + * + * this started as an implementation of an io daemon that woke regularly + * to force writeback.. the throttling in prepare_write and kupdate's usual + * writeback pressure got rid of our thread, but the file name remains. + */ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +/* PG_inactive_clean is shorthand for rmap, we want free_high/low here.. */ +#ifdef PG_inactive_clean +#include +#endif + +#define DEBUG_SUBSYSTEM S_LLITE +#include + +#ifndef list_for_each_prev_safe +#define list_for_each_prev_safe(pos, n, head) \ + for (pos = (head)->prev, n = pos->prev; pos != (head); \ + pos = n, n = pos->prev ) +#endif + +extern spinlock_t inode_lock; + +#define LLWP_MAX_PAGES (PTL_MD_MAX_IOV) +struct ll_writeback_pages { + unsigned has_whole_pages:1, + num_frags:2, + num_pages:29; + struct brw_page pgs[LLWP_MAX_PAGES]; +}; + + +/* + * ugh, we want disk allocation on the target to happen in offset order. we'll + * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do + * fine for our small page arrays and doesn't require allocation. its an + * insertion sort that swaps elements that are strides apart, shrinking the + * stride down until its '1' and the array is sorted. + */ +void sort_brw_pages(struct brw_page *array, int num) +{ + int stride, i, j; + struct brw_page tmp; + + if ( num == 1 ) + return; + + for( stride = 1; stride < num ; stride = (stride*3) +1 ) + ; + + do { + stride /= 3; + for ( i = stride ; i < num ; i++ ) { + tmp = array[i]; + j = i; + while ( j >= stride && + array[j - stride].off > tmp.off ) { + array[j] = array[j - stride]; + j -= stride; + } + array[j] = tmp; + } + } while ( stride > 1 ); +} + +/* + * returns 0 if the page was inserted in the array because it was + * within i_size. if we raced with truncate and i_size was less + * than the page we can unlock the page because truncate_inode_pages will + * be waiting to cleanup the page + */ +static int llwp_consume_page(struct ll_writeback_pages *llwp, + struct inode *inode, struct page *page) +{ + obd_off off = ((obd_off)page->index) << PAGE_SHIFT; + struct brw_page *pg; + + /* we raced with truncate? */ + if ( off >= inode->i_size ) { + unlock_page(page); + goto out; + } + + page_cache_get(page); + pg = &llwp->pgs[llwp->num_pages]; + llwp->num_pages++; + + pg->pg = page; + pg->off = off; + pg->flag = OBD_BRW_CREATE; + pg->count = PAGE_SIZE; + + /* catch partial writes for files that end mid-page */ + if ( pg->off + pg->count > inode->i_size ) + pg->count = inode->i_size & ~PAGE_MASK; + + if ( pg->count == PAGE_SIZE ) { + if ( ! llwp->has_whole_pages ) { + llwp->has_whole_pages = 1; + llwp->num_frags++; + } + } else { + llwp->num_frags++; + } + + /* + * matches ptlrpc_bulk_get assert that trickles down + * from a 0 page length going through niobuf and into + * the buffer regions being posted + */ + LASSERT(pg->count >= 0); + + CDEBUG(D_CACHE, "brw_page %p: off "LPU64" cnt %d, page %p: ind %ld" + " i_size: "LPU64"\n", pg, pg->off, pg->count, page, + page->index, inode->i_size); + + if ( llwp->num_frags == 3 || llwp->num_pages == LLWP_MAX_PAGES ) + return -1; + +out: + return 0; +} + +/* + * returns the number of pages that it added to the pgs array + * + * this duplicates filemap_fdatasync and gives us an opportunity to grab lots + * of dirty pages.. + */ +static void ll_get_dirty_pages(struct inode *inode, + struct ll_writeback_pages *llwp) +{ + struct address_space *mapping = inode->i_mapping; + struct page *page; + struct list_head *pos, *n; + ENTRY; + + spin_lock(&pagecache_lock); + + list_for_each_prev_safe(pos, n, &mapping->dirty_pages) { + page = list_entry(pos, struct page, list); + + if (TryLockPage(page)) + continue; + + list_del(&page->list); + list_add(&page->list, &mapping->locked_pages); + + if ( ! PageDirty(page) ) { + unlock_page(page); + continue; + } + ClearPageDirty(page); + + if ( llwp_consume_page(llwp, inode, page) != 0) + break; + } + + spin_unlock(&pagecache_lock); + EXIT; +} + +static void ll_brw_pages_unlock( struct inode *inode, + struct ll_writeback_pages *llwp) +{ + int rc, i; + struct obd_brw_set *set; + ENTRY; + + sort_brw_pages(llwp->pgs, llwp->num_pages); + + set = obd_brw_set_new(); + if (set == NULL) { + EXIT; + return; + } + set->brw_callback = ll_brw_sync_wait; + + rc = obd_brw(OBD_BRW_WRITE, ll_i2obdconn(inode), + ll_i2info(inode)->lli_smd, llwp->num_pages, llwp->pgs, + set, NULL); + if (rc) { + CERROR("error from obd_brw: rc = %d\n", rc); + } else { + rc = ll_brw_sync_wait(set, CB_PHASE_START); + if (rc) + CERROR("error from callback: rc = %d\n", rc); + } + obd_brw_set_decref(set); + + /* XXX this doesn't make sense to me */ + rc = 0; + + for ( i = 0 ; i < llwp->num_pages ; i++) { + struct page *page = llwp->pgs[i].pg; + + CDEBUG(D_CACHE, "cleaning page %p\n", page); + LASSERT(PageLocked(page)); + unlock_page(page); + page_cache_release(page); + } + + EXIT; +} + +#ifndef PG_inactive_clean +#ifdef CONFIG_DISCONTIGMEM +#error "sorry, we don't support DISCONTIGMEM yet" +#endif +/* + * __alloc_pages marks a zone as needing balancing if an allocation is + * performed when the zone has fewer free pages than its 'low' water + * mark. its cleared when try_to_free_pages makes progress. + */ +static int zones_need_balancing(void) +{ + pg_data_t * pgdat; + zone_t *zone; + int i; + + for ( pgdat = pgdat_list ; pgdat != NULL ; pgdat = pgdat->node_next ) { + for ( i = pgdat->nr_zones-1 ; i >= 0 ; i-- ) { + zone = &pgdat->node_zones[i]; + + if ( zone->need_balance ) + return 1; + } + } + return 0; +} +#endif +/* 2.4 doesn't give us a way to find out how many pages we have + * cached 'cause we're not using buffer_heads. we are very + * conservative here and flush the superblock of all dirty data + * when the vm (rmap or stock) thinks that it is running low + * and kswapd would have done work. kupdated isn't good enough + * because writers (dbench) can dirty _very quickly_, and we + * allocate under writepage.. + * + * 2.5 gets this right, see the {inc,dec}_page_state(nr_dirty, ) + */ +static int should_writeback(void) +{ +#ifdef PG_inactive_clean + if (free_high(ALL_ZONES) > 0 || free_low(ANY_ZONE) > 0) +#else + if (zones_need_balancing()) +#endif + return 1; + return 0; +} + +int ll_check_dirty( struct super_block *sb) +{ + unsigned long old_flags; /* hack? */ + int making_progress; + struct ll_writeback_pages *llwp; + struct inode *inode; + int rc = 0; + ENTRY; + + if ( ! should_writeback() ) + return 0; + + old_flags = current->flags; + current->flags |= PF_MEMALLOC; + llwp = kmalloc(sizeof(struct ll_writeback_pages), GFP_ATOMIC); + if ( llwp == NULL ) + GOTO(cleanup, rc = -ENOMEM); + memset(llwp, 0, offsetof(struct ll_writeback_pages, pgs)); + + spin_lock(&inode_lock); + + /* + * first we try and write back dirty pages from dirty inodes + * until the VM thinkgs we're ok again.. + */ + do { + struct list_head *pos; + inode = NULL; + making_progress = 0; + + list_for_each_prev(pos, &sb->s_dirty) { + inode = list_entry(pos, struct inode, i_list); + + if ( ! (inode->i_state & I_DIRTY_PAGES) ) { + inode = NULL; + continue; + } + break; + } + + if ( inode == NULL ) + break; + + /* duplicate __sync_one, *sigh* */ + list_del(&inode->i_list); + list_add(&inode->i_list, &inode->i_sb->s_locked_inodes); + inode->i_state |= I_LOCK; + inode->i_state &= ~I_DIRTY_PAGES; + + spin_unlock(&inode_lock); + + do { + memset(llwp, 0, sizeof(*llwp)); + ll_get_dirty_pages(inode, llwp); + if ( llwp->num_pages ) { + ll_brw_pages_unlock(inode, llwp); + rc += llwp->num_pages; + making_progress = 1; + } + } while (llwp->num_pages && should_writeback() ); + + spin_lock(&inode_lock); + + if ( ! list_empty(&inode->i_mapping->dirty_pages) ) + inode->i_state |= I_DIRTY_PAGES; + + inode->i_state &= ~I_LOCK; + /* + * we are sneaky and leave the inode on the dirty list, + * even though it might not still be.. + */ + if (!(inode->i_state & I_FREEING)) { + list_del(&inode->i_list); + list_add(&inode->i_list, &inode->i_sb->s_dirty); + } + wake_up(&inode->i_wait); + + } while ( making_progress && should_writeback() ); + + /* + * and if that didn't work, we sleep on any data that might + * be under writeback.. + */ + while ( should_writeback() ) { + if ( list_empty(&sb->s_locked_inodes) ) + break; + + inode = list_entry(sb->s_locked_inodes.next, struct inode, + i_list); + + atomic_inc(&inode->i_count); /* XXX hack? */ + spin_unlock(&inode_lock); + wait_event(inode->i_wait, !(inode->i_state & I_LOCK)); + iput(inode); + spin_lock(&inode_lock); + } + + spin_unlock(&inode_lock); + +cleanup: + if ( llwp != NULL ) + kfree(llwp); + current->flags = old_flags; + + RETURN(rc); +} + +int ll_batch_writepage( struct inode *inode, struct page *page ) +{ + unsigned long old_flags; /* hack? */ + struct ll_writeback_pages *llwp; + int rc = 0; + ENTRY; + + old_flags = current->flags; + current->flags |= PF_MEMALLOC; + llwp = kmalloc(sizeof(struct ll_writeback_pages), GFP_ATOMIC); + if ( llwp == NULL ) + GOTO(cleanup, rc = -ENOMEM); + memset(llwp, 0, offsetof(struct ll_writeback_pages, pgs)); + + llwp_consume_page(llwp, inode, page); + + ll_get_dirty_pages(inode, llwp); + if ( llwp->num_pages ) + ll_brw_pages_unlock(inode, llwp); + +cleanup: + if ( llwp != NULL ) + kfree(llwp); + current->flags = old_flags; + RETURN(rc); +} diff --git a/lustre/llite/rw.c b/lustre/llite/rw.c index 6818ace..409f308 100644 --- a/lustre/llite/rw.c +++ b/lustre/llite/rw.c @@ -33,6 +33,7 @@ #include #include + #include #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0)) #include @@ -70,6 +71,7 @@ static void __set_page_clean(struct page *page) list_del(&page->list); list_add(&page->list, &mapping->clean_pages); + /* XXX doesn't inode_lock protect i_state ? */ inode = mapping->host; if (list_empty(&mapping->dirty_pages)) { CDEBUG(D_INODE, "inode clean\n"); @@ -81,7 +83,7 @@ static void __set_page_clean(struct page *page) EXIT; } -inline void set_page_clean(struct page *page) +void set_page_clean(struct page *page) { if (PageDirty(page)) { ClearPageDirty(page); @@ -90,7 +92,7 @@ inline void set_page_clean(struct page *page) } /* SYNCHRONOUS I/O to object storage for an inode */ -static int ll_brw(int cmd, struct inode *inode, struct page *page, int create) +static int ll_brw(int cmd, struct inode *inode, struct page *page, int flags) { struct ll_inode_info *lli = ll_i2info(inode); struct lov_stripe_md *lsm = lli->lli_smd; @@ -112,8 +114,8 @@ static int ll_brw(int cmd, struct inode *inode, struct page *page, int create) pg.count = PAGE_SIZE; CDEBUG(D_PAGE, "%s %d bytes ino %lu at "LPU64"/"LPX64"\n", - cmd & OBD_BRW_WRITE ? "write" : "read", pg.count, inode->i_ino, - pg.off, pg.off); + cmd & OBD_BRW_WRITE ? "write" : "read", pg.count, inode->i_ino, + pg.off, pg.off); if (pg.count == 0) { CERROR("ZERO COUNT: ino %lu: size %p:%Lu(%p:%Lu) idx %lu off " LPU64"\n", @@ -121,7 +123,7 @@ static int ll_brw(int cmd, struct inode *inode, struct page *page, int create) page->mapping->host->i_size, page->index, pg.off); } - pg.flag = create ? OBD_BRW_CREATE : 0; + pg.flag = flags; set->brw_callback = ll_brw_sync_wait; rc = obd_brw(cmd, ll_i2obdconn(inode), lsm, 1, &pg, set, NULL); @@ -133,56 +135,174 @@ static int ll_brw(int cmd, struct inode *inode, struct page *page, int create) if (rc) CERROR("error from callback: rc = %d\n", rc); } - obd_brw_set_free(set); + obd_brw_set_decref(set); RETURN(rc); } -/* returns the page unlocked, but with a reference */ -static int ll_readpage(struct file *file, struct page *page) +/* + * we were asked to read a single page but we're going to try and read a batch + * of pages all at once. this vaguely simulates 2.5's readpages. + */ +static int ll_readpage(struct file *file, struct page *first_page) { - struct inode *inode = page->mapping->host; - obd_off offset = ((obd_off)page->index) << PAGE_SHIFT; - int rc = 0; + struct inode *inode = first_page->mapping->host; + struct ll_inode_info *lli = ll_i2info(inode); + struct page *page = first_page; + struct list_head *pos; + struct brw_page *pgs; + struct obd_brw_set *set; + unsigned long end_index, extent_end = 0; + int npgs = 0, rc = 0; ENTRY; - if (!PageLocked(page)) - LBUG(); + LASSERT(PageLocked(page)); + LASSERT(!PageUptodate(page)); + CDEBUG(D_VFSTRACE, "VFS Op\n"); - if (inode->i_size <= offset) { + if (inode->i_size <= ((obd_off)page->index) << PAGE_SHIFT) { CERROR("reading beyond EOF\n"); memset(kmap(page), 0, PAGE_SIZE); kunmap(page); - GOTO(readpage_out, rc); + SetPageUptodate(page); + unlock_page(page); + RETURN(rc); } - /* XXX Workaround for BA OSTs returning short reads at EOF. The linux - * OST will return the full page, zero-filled at the end, which - * will just overwrite the data we set here. - * Bug 593 relates to fixing this properly. + pgs = kmalloc(PTL_MD_MAX_IOV * sizeof(*pgs), GFP_USER); + if ( pgs == NULL ) + RETURN(-ENOMEM); + set = obd_brw_set_new(); + if ( set == NULL ) + GOTO(out_pgs, rc = -ENOMEM); + + /* arbitrarily try to read-ahead 8 times what we can pass on + * the wire at once, clamped to file size */ + end_index = first_page->index + + 8 * ((PTL_MD_MAX_IOV * PAGE_SIZE)>>PAGE_CACHE_SHIFT); + if ( end_index > inode->i_size >> PAGE_CACHE_SHIFT ) + end_index = inode->i_size >> PAGE_CACHE_SHIFT; + + /* + * find how far we're allowed to read under the extent ll_file_read + * is passing us.. */ - if (inode->i_size < offset + PAGE_SIZE) { - int count = inode->i_size - offset; - void *addr = kmap(page); - //POISON(addr, 0x7c, count); - memset(addr + count, 0, PAGE_SIZE - count); - kunmap(page); + spin_lock(&lli->lli_read_extent_lock); + list_for_each(pos, &lli->lli_read_extents) { + struct ll_read_extent *rextent; + rextent = list_entry(pos, struct ll_read_extent, re_lli_item); + if ( rextent->re_task != current ) + continue; + + if (rextent->re_extent.end + PAGE_SIZE < rextent->re_extent.end) + /* extent wrapping */ + extent_end = ~0; + else { + extent_end = ( rextent->re_extent.end + PAGE_SIZE ) + << PAGE_CACHE_SHIFT; + /* 32bit indexes, 64bit extents.. */ + if ( ((u64)extent_end >> PAGE_CACHE_SHIFT ) < + rextent->re_extent.end ) + extent_end = ~0; + } + break; } + spin_unlock(&lli->lli_read_extent_lock); + + if ( extent_end == 0 ) { + CERROR("readpage outside ll_file_read, no lock held?\n"); + end_index = page->index + 1; + } else if ( extent_end < end_index ) + end_index = extent_end; + + /* to balance the find_get_page ref the other pages get that is + * decrefed on teardown.. */ + page_cache_get(page); + do { + unsigned long index ; + + pgs[npgs].pg = page; + pgs[npgs].off = ((obd_off)page->index) << PAGE_CACHE_SHIFT; + pgs[npgs].flag = 0; + pgs[npgs].count = PAGE_SIZE; + /* XXX Workaround for BA OSTs returning short reads at EOF. + * The linux OST will return the full page, zero-filled at the + * end, which will just overwrite the data we set here. Bug + * 593 relates to fixing this properly. + */ + if (inode->i_size < pgs[npgs].off + PAGE_SIZE) { + int count = inode->i_size - pgs[npgs].off; + void *addr = kmap(page); + pgs[npgs].count = count; + //POISON(addr, 0x7c, count); + memset(addr + count, 0, PAGE_SIZE - count); + kunmap(page); + } + + npgs++; + if ( npgs == PTL_MD_MAX_IOV ) + break; + + /* + * find pages ahead of us that we can read in. + * grab_cache_page waits on pages that are locked so + * we first try find_get_page, which doesn't. this stops + * the worst case behaviour of racing threads waiting on + * each other, but doesn't remove it entirely. + */ + for ( index = page->index + 1, page = NULL ; + page == NULL && index < end_index ; index++ ) { + + /* see if the page already exists and needs updating */ + page = find_get_page(inode->i_mapping, index); + if ( page ) { + if ( Page_Uptodate(page) || TryLockPage(page) ) + goto out_release; + if ( !page->mapping || Page_Uptodate(page)) + goto out_unlock; + } else { + /* ok, we have to create it.. */ + page = grab_cache_page(inode->i_mapping, index); + if ( page == NULL ) + continue; + if ( Page_Uptodate(page) ) + goto out_unlock; + } + + break; + + out_unlock: + unlock_page(page); + out_release: + page_cache_release(page); + page = NULL; + } - if (PageUptodate(page)) { - CERROR("Explain this please?\n"); - GOTO(readpage_out, rc); + } while (page); + + set->brw_callback = ll_brw_sync_wait; + rc = obd_brw(OBD_BRW_READ, ll_i2obdconn(inode), + ll_i2info(inode)->lli_smd, npgs, pgs, set, NULL); + if (rc) { + CERROR("error from obd_brw: rc = %d\n", rc); + } else { + rc = ll_brw_sync_wait(set, CB_PHASE_START); + if (rc) + CERROR("error from callback: rc = %d\n", rc); } + obd_brw_set_decref(set); - CDEBUG(D_VFSTRACE, "VFS Op\n"); - rc = ll_brw(OBD_BRW_READ, inode, page, 0); - EXIT; + while ( --npgs > -1 ) { + page = pgs[npgs].pg; - readpage_out: - if (!rc) - SetPageUptodate(page); - unlock_page(page); - return 0; + if ( rc == 0 ) + SetPageUptodate(page); + unlock_page(page); + page_cache_release(page); + } +out_pgs: + kfree(pgs); + RETURN(rc); } /* ll_readpage */ void ll_truncate(struct inode *inode) @@ -190,12 +310,14 @@ void ll_truncate(struct inode *inode) struct obdo oa = {0}; struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd; struct lustre_handle lockh = { 0, 0 }; + struct ldlm_extent extent = {inode->i_size, OBD_OBJECT_EOF}; int err; ENTRY; if (!lsm) { /* object not yet allocated */ inode->i_mtime = inode->i_ctime = CURRENT_TIME; + EXIT; return; } @@ -207,9 +329,11 @@ void ll_truncate(struct inode *inode) CDEBUG(D_INFO, "calling punch for "LPX64" (all bytes after %Lu)\n", oa.o_id, inode->i_size); - err = ll_size_lock(inode, lsm, inode->i_size, LCK_PW, &lockh); - if (err) { - CERROR("ll_size_lock failed: %d\n", err); + /* i_size has already been set to the new size */ + err = ll_extent_lock_no_validate(NULL, inode, lsm, LCK_PW, + &extent, &lockh); + if (err != ELDLM_OK && err != ELDLM_LOCK_MATCHED) { + EXIT; return; } @@ -221,9 +345,9 @@ void ll_truncate(struct inode *inode) else obdo_to_inode(inode, &oa, oa.o_valid); - err = ll_size_unlock(inode, lsm, LCK_PW, &lockh); + err = ll_extent_unlock(NULL, inode, lsm, LCK_PW, &lockh); if (err) - CERROR("ll_size_unlock failed: %d\n", err); + CERROR("ll_extent_unlock failed: %d\n", err); EXIT; return; @@ -237,11 +361,12 @@ static int ll_prepare_write(struct file *file, struct page *page, unsigned from, struct inode *inode = page->mapping->host; obd_off offset = ((obd_off)page->index) << PAGE_SHIFT; int rc = 0; - char *addr; ENTRY; - addr = kmap(page); - LASSERT(PageLocked(page)); + ll_check_dirty(inode->i_sb); + + if (!PageLocked(page)) + LBUG(); if (PageUptodate(page)) RETURN(0); @@ -254,118 +379,83 @@ static int ll_prepare_write(struct file *file, struct page *page, unsigned from, RETURN(0); CDEBUG(D_VFSTRACE, "VFS Op\n"); - /* If are writing to a new page, no need to read old data. If we - * haven't already gotten the file size in ll_file_write() since - * we got our extent lock, we need to verify it here before we - * overwrite some other node's write (bug 445). - */ + /* If are writing to a new page, no need to read old data. + * the extent locking and getattr procedures in ll_file_write have + * guaranteed that i_size is stable enough for our zeroing needs */ if (inode->i_size <= offset) { - if (!S_ISBLK(inode->i_mode) && !(file->f_flags & O_APPEND)) { - struct ll_file_data *fd = file->private_data; - struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd; - - rc = ll_file_size(inode, lsm, fd->fd_ostdata); - if (rc) - GOTO(prepare_done, rc); - } - if (inode->i_size <= offset) { - memset(addr, 0, PAGE_SIZE); - GOTO(prepare_done, rc=0); - } + memset(kmap(page), 0, PAGE_SIZE); + kunmap(page); + GOTO(prepare_done, rc = 0); } rc = ll_brw(OBD_BRW_READ, inode, page, 0); EXIT; prepare_done: - if (!rc) + if (rc == 0) SetPageUptodate(page); - else - kunmap (page); return rc; } -/* Write a page from kupdated or kswapd. +/* + * background file writeback. This is called regularly from kupdated to write + * dirty data, from kswapd when memory is low, and from filemap_fdatasync when + * super blocks or inodes are synced.. * - * We unlock the page even in the face of an error, otherwise dirty - * pages could OOM the system if they cannot be written. Also, there - * is nobody to return an error code to from here - the application - * may not even be running anymore. + * obd_brw errors down in _batch_writepage are ignored, so pages are always + * unlocked. Also, there is nobody to return an error code to from here - the + * application may not even be running anymore. * - * Returns the page unlocked, but with a reference. + * this should be async so that things like kswapd can have a chance to + * free some more pages that our allocating writeback may need, but it isn't + * yet. */ -static int ll_writepage(struct page *page) { +static int ll_writepage(struct page *page) +{ struct inode *inode = page->mapping->host; - int err; ENTRY; - LASSERT(PageLocked(page)); - - /* XXX need to make sure we have LDLM lock on this page */ + CDEBUG(D_CACHE, "page %p [lau %d] inode %p\n", page, + PageLaunder(page), inode); CDEBUG(D_VFSTRACE, "VFS Op\n"); - err = ll_brw(OBD_BRW_WRITE, inode, page, 1); - if (err) - CERROR("ll_brw failure %d\n", err); - else - set_page_clean(page); + LASSERT(PageLocked(page)); - unlock_page(page); - RETURN(err); + /* XXX should obd_brw errors trickle up? */ + ll_batch_writepage(inode, page); + RETURN(0); } - -/* SYNCHRONOUS I/O to object storage for an inode -- object attr will be updated - * too */ +/* + * we really don't want to start writeback here, we want to give callers some + * time to further dirty the pages before we write them out. + */ static int ll_commit_write(struct file *file, struct page *page, unsigned from, unsigned to) { struct inode *inode = page->mapping->host; - struct ll_inode_info *lli = ll_i2info(inode); - struct lov_stripe_md *md = lli->lli_smd; - struct brw_page pg; - struct obd_brw_set *set; - int rc, create = 1; loff_t size; ENTRY; - pg.pg = page; - pg.count = to; - /* XXX make the starting offset "from" */ - pg.off = (((obd_off)page->index) << PAGE_SHIFT); - pg.flag = create ? OBD_BRW_CREATE : 0; - - set = obd_brw_set_new(); - if (set == NULL) - RETURN(-ENOMEM); - - SetPageUptodate(page); - - if (!PageLocked(page)) - LBUG(); + LASSERT(inode == file->f_dentry->d_inode); + LASSERT(PageLocked(page)); CDEBUG(D_VFSTRACE, "VFS Op\n"); - CDEBUG(D_INODE, "commit_page writing (off "LPD64"), count %d\n", - pg.off, pg.count); + CDEBUG(D_INODE, "inode %p is writing page %p from %d to %d at %lu\n", + inode, page, from, to, page->index); - set->brw_callback = ll_brw_sync_wait; - rc = obd_brw(OBD_BRW_WRITE, ll_i2obdconn(inode), md, 1, &pg, set, NULL); - if (rc) - CERROR("error from obd_brw: rc = %d\n", rc); - else { - rc = ll_brw_sync_wait(set, CB_PHASE_START); - if (rc) - CERROR("error from callback: rc = %d\n", rc); - } - obd_brw_set_free(set); - kunmap(page); + /* to match full page case in prepare_write */ + SetPageUptodate(page); + /* mark the page dirty, put it on mapping->dirty, + * mark the inode PAGES_DIRTY, put it on sb->dirty */ + set_page_dirty(page); - size = pg.off + pg.count; - /* do NOT truncate when writing in the middle of a file */ + /* this is matched by a hack in obdo_to_inode at the moment */ + size = (((obd_off)page->index) << PAGE_SHIFT) + to; if (size > inode->i_size) inode->i_size = size; - RETURN(rc); + RETURN(0); } /* ll_commit_write */ #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0)) @@ -384,11 +474,17 @@ static int ll_direct_IO(int rw, struct inode *inode, struct kiobuf *iobuf, if (!lsm || !lsm->lsm_object_id) RETURN(-ENOMEM); + if ((iobuf->offset & (blocksize - 1)) || + (iobuf->length & (blocksize - 1))) + RETURN(-EINVAL); + +#if 0 /* XXX Keep here until we find ia64 problem, it crashes otherwise */ if (blocksize != PAGE_SIZE) { CERROR("direct_IO blocksize != PAGE_SIZE\n"); RETURN(-EINVAL); } +#endif set = obd_brw_set_new(); if (set == NULL) @@ -396,17 +492,12 @@ static int ll_direct_IO(int rw, struct inode *inode, struct kiobuf *iobuf, OBD_ALLOC(pga, sizeof(*pga) * iobuf->nr_pages); if (!pga) { - obd_brw_set_free(set); + obd_brw_set_decref(set); RETURN(-ENOMEM); } - CDEBUG(D_PAGE, "blocksize %u, blocknr %lu, iobuf %p: nr_pages %u, " - "array_len %u, offset %u, length %u\n", - blocksize, blocknr, iobuf, iobuf->nr_pages, - iobuf->array_len, iobuf->offset, iobuf->length); - flags = (rw == WRITE ? OBD_BRW_CREATE : 0) /* | OBD_BRW_DIRECTIO */; - offset = (blocknr << inode->i_blkbits) /* + iobuf->offset? */; + offset = (blocknr << inode->i_blkbits); length = iobuf->length; for (i = 0, length = iobuf->length; length > 0; @@ -417,8 +508,6 @@ static int ll_direct_IO(int rw, struct inode *inode, struct kiobuf *iobuf, pga[i].count = min_t(int, PAGE_SIZE - (offset & ~PAGE_MASK), length); pga[i].flag = flags; - CDEBUG(D_PAGE, "page %d (%p), offset "LPU64", count %u\n", - i, pga[i].pg, pga[i].off, pga[i].count); if (rw == READ) { //POISON(kmap(iobuf->maplist[i]), 0xc5, PAGE_SIZE); //kunmap(iobuf->maplist[i]); @@ -436,7 +525,7 @@ static int ll_direct_IO(int rw, struct inode *inode, struct kiobuf *iobuf, if (rc) CERROR("error from callback: rc = %d\n", rc); } - obd_brw_set_free(set); + obd_brw_set_decref(set); if (rc == 0) rc = iobuf->length; @@ -445,52 +534,8 @@ static int ll_direct_IO(int rw, struct inode *inode, struct kiobuf *iobuf, } #endif -int ll_flush_inode_pages(struct inode * inode) -{ - obd_count bufs_per_obdo = 0; - obd_size *count = NULL; - obd_off *offset = NULL; - obd_flag *flags = NULL; - int err = 0; - - ENTRY; - -#if (LINUX_VERSION_CODE <= KERNEL_VERSION(2,5,0)) - spin_lock(&pagecache_lock); - - spin_unlock(&pagecache_lock); -#endif - - - OBD_ALLOC(count, sizeof(*count) * bufs_per_obdo); - OBD_ALLOC(offset, sizeof(*offset) * bufs_per_obdo); - OBD_ALLOC(flags, sizeof(*flags) * bufs_per_obdo); - if (!count || !offset || !flags) - GOTO(out, err=-ENOMEM); - -#if 0 - for (i = 0 ; i < bufs_per_obdo ; i++) { - count[i] = PAGE_SIZE; - offset[i] = ((obd_off)(iobuf->maplist[i])->index) << PAGE_SHIFT; - flags[i] = OBD_BRW_CREATE; - } - - err = obd_brw(OBD_BRW_WRITE, ll_i2obdconn(inode), - ll_i2info(inode)->lli_smd, bufs_per_obdo, - iobuf->maplist, count, offset, flags, NULL, NULL); - if (err == 0) - err = bufs_per_obdo * 4096; -#endif - out: - OBD_FREE(flags, sizeof(*flags) * bufs_per_obdo); - OBD_FREE(count, sizeof(*count) * bufs_per_obdo); - OBD_FREE(offset, sizeof(*offset) * bufs_per_obdo); - RETURN(err); -} - //#endif - struct address_space_operations ll_aops = { readpage: ll_readpage, #if (LINUX_VERSION_CODE <= KERNEL_VERSION(2,5,0)) diff --git a/lustre/llite/super.c b/lustre/llite/super.c index 613c42f..ff754a0 100644 --- a/lustre/llite/super.c +++ b/lustre/llite/super.c @@ -57,14 +57,14 @@ static char *ll_read_opt(const char *opt, char *data) ENTRY; CDEBUG(D_SUPER, "option: %s, data %s\n", opt, data); - if ( strncmp(opt, data, strlen(opt)) ) + if (strncmp(opt, data, strlen(opt))) RETURN(NULL); - if ( (value = strchr(data, '=')) == NULL ) + if ((value = strchr(data, '=')) == NULL) RETURN(NULL); value++; OBD_ALLOC(retval, strlen(value) + 1); - if ( !retval ) { + if (!retval) { CERROR("out of memory!\n"); RETURN(NULL); } @@ -79,7 +79,7 @@ static int ll_set_opt(const char *opt, char *data, int fl) ENTRY; CDEBUG(D_SUPER, "option: %s, data %s\n", opt, data); - if ( strncmp(opt, data, strlen(opt)) ) + if (strncmp(opt, data, strlen(opt))) RETURN(0); else RETURN(fl); @@ -99,10 +99,11 @@ static void ll_options(char *options, char **ost, char **mds, int *flags) this_char != NULL; this_char = strtok (NULL, ",")) { CDEBUG(D_SUPER, "this_char %s\n", this_char); - if ( (!*ost && (*ost = ll_read_opt("osc", this_char)))|| - (!*mds && (*mds = ll_read_opt("mdc", this_char)))|| - (!(*flags & LL_SBI_NOLCK) && ((*flags) = (*flags) | - ll_set_opt("nolock", this_char, LL_SBI_NOLCK))) ) + if ((!*ost && (*ost = ll_read_opt("osc", this_char)))|| + (!*mds && (*mds = ll_read_opt("mdc", this_char)))|| + (!(*flags & LL_SBI_NOLCK) && + ((*flags) = (*flags) | + ll_set_opt("nolock", this_char, LL_SBI_NOLCK)))) continue; } EXIT; @@ -466,6 +467,20 @@ int ll_setattr_raw(struct inode *inode, struct iattr *attr) ENTRY; if ((attr->ia_valid & ATTR_SIZE)) { + /* writeback uses inode->i_size to determine how far out + * its cached pages go. ll_truncate gets a PW lock, canceling + * our lock, _after_ it has updated i_size. this can confuse + * us into zero extending the file to the newly truncated + * size, and this has bad implications for a racing o_append. + * if we're extending our size we need to flush the pages + * with the correct i_size before vmtruncate stomps on + * the new i_size. again, this can only find pages to + * purge if the PW lock that generated them is still held. + */ + if ( attr->ia_size > inode->i_size ) { + filemap_fdatasync(inode->i_mapping); + filemap_fdatawait(inode->i_mapping); + } err = vmtruncate(inode, attr->ia_size); if (err) RETURN(err); @@ -613,21 +628,32 @@ static void ll_read_inode2(struct inode *inode, void *opaque) CDEBUG(D_VFSTRACE, "VFS Op\n"); sema_init(&lli->lli_open_sem, 1); + atomic_set(&lli->lli_open_count, 0); + lli->lli_flags = 0; + init_MUTEX(&lli->lli_getattr_sem); + spin_lock_init(&lli->lli_read_extent_lock); + INIT_LIST_HEAD(&lli->lli_read_extents); LASSERT(!lli->lli_smd); - /* core attributes first */ + /* core attributes from the MDS first */ ll_update_inode(inode, body, lic ? lic->lic_lmm : NULL); /* Get the authoritative file size */ if (lli->lli_smd && (inode->i_mode & S_IFREG)) { - int rc; + struct ldlm_extent extent = {0, OBD_OBJECT_EOF}; + struct lustre_handle lockh = {0, 0}; + struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd; + ldlm_error_t rc; + LASSERT(lli->lli_smd->lsm_object_id != 0); - rc = ll_file_size(inode, lli->lli_smd, NULL); - if (rc) { - CERROR("ll_file_size: %d\n", rc); + + rc = ll_extent_lock(NULL, inode, lsm, LCK_PR, &extent, &lockh); + if (rc != ELDLM_OK && rc != ELDLM_LOCK_MATCHED) { ll_clear_inode(inode); make_bad_inode(inode); + } else { + ll_extent_unlock(NULL, inode, lsm, LCK_PR, &lockh); } } diff --git a/lustre/llite/super25.c b/lustre/llite/super25.c index f77fdea..f296d10 100644 --- a/lustre/llite/super25.c +++ b/lustre/llite/super25.c @@ -63,14 +63,14 @@ static char *ll_read_opt(const char *opt, char *data) ENTRY; CDEBUG(D_SUPER, "option: %s, data %s\n", opt, data); - if ( strncmp(opt, data, strlen(opt)) ) + if (strncmp(opt, data, strlen(opt))) RETURN(NULL); - if ( (value = strchr(data, '=')) == NULL ) + if ((value = strchr(data, '=')) == NULL) RETURN(NULL); value++; OBD_ALLOC(retval, strlen(value) + 1); - if ( !retval ) { + if (!retval) { CERROR("out of memory!\n"); RETURN(NULL); } @@ -85,7 +85,7 @@ static int ll_set_opt(const char *opt, char *data, int fl) ENTRY; CDEBUG(D_SUPER, "option: %s, data %s\n", opt, data); - if ( strncmp(opt, data, strlen(opt)) ) + if (strncmp(opt, data, strlen(opt))) RETURN(0); else RETURN(fl); @@ -104,10 +104,11 @@ static void ll_options(char *options, char **ost, char **mds, int *flags) while ((this_char = strsep (&opt_ptr, ",")) != NULL) { CDEBUG(D_SUPER, "this_char %s\n", this_char); - if ( (!*ost && (*ost = ll_read_opt("osc", this_char)))|| - (!*mds && (*mds = ll_read_opt("mdc", this_char)))|| - (!(*flags & LL_SBI_NOLCK) && ((*flags) = (*flags) | - ll_set_opt("nolock", this_char, LL_SBI_NOLCK))) ) + if ((!*ost && (*ost = ll_read_opt("osc", this_char)))|| + (!*mds && (*mds = ll_read_opt("mdc", this_char)))|| + (!(*flags & LL_SBI_NOLCK) && + ((*flags) = (*flags) | + ll_set_opt("nolock", this_char, LL_SBI_NOLCK)))) continue; } EXIT; @@ -572,6 +573,11 @@ int ll_read_inode2(struct inode *inode, void *opaque) ENTRY; sema_init(&lli->lli_open_sem, 1); + lli->flags = 0; + init_MUTEX(&lli->lli_getattr_sem); + /* these are 2.4 only, but putting them here for consistency.. */ + spin_lock_init(&lli->lli_read_extent_lock); + INIT_LIST_HEAD(&lli->lli_read_extents); LASSERT(!lli->lli_smd); @@ -580,12 +586,19 @@ int ll_read_inode2(struct inode *inode, void *opaque) /* Get the authoritative file size */ if (lli->lli_smd && S_ISREG(inode->i_mode)) { - rc = ll_file_size(inode, lli->lli_smd, NULL); - if (rc) { - CERROR("ll_file_size: %d\n", rc); + struct ll_file_data *fd = file->private_data; + struct ldlm_extent extent = {0, OBD_OBJECT_EOF}; + struct lustre_handle lockh = {0, 0}; + + LASSERT(lli->lli_smd->lsm_object_id != 0); + + rc = ll_extent_lock(fd, inode, lsm, LCK_PR, &extent, &lockh); + if (err != ELDLM_OK && err != ELDLM_MATCHED) { ll_clear_inode(inode); make_bad_inode(inode); - RETURN(rc); + } else { + l_extent_unlock(fd, inode, lsm, LCK_PR, &extent, + &lockh); } } @@ -661,6 +674,7 @@ static struct inode *ll_alloc_inode(struct super_block *sb) memset(lli, 0, (char *)&lli->lli_vfs_inode - (char *)lli); sema_init(&lli->lli_open_sem, 1); + init_MUTEX(&lli->lli_size_valid_sem); return &lli->lli_vfs_inode; } diff --git a/lustre/lov/lov_obd.c b/lustre/lov/lov_obd.c index 0e7ad82..19738b9 100644 --- a/lustre/lov/lov_obd.c +++ b/lustre/lov/lov_obd.c @@ -1267,7 +1267,8 @@ static int lov_enqueue(struct lustre_handle *conn, struct lov_stripe_md *lsm, struct lov_obd *lov; struct lov_oinfo *loi; struct lov_stripe_md submd; - int rc = 0, i; + ldlm_error_t rc = ELDLM_LOCK_MATCHED, err; + int i; ENTRY; if (!lsm) { @@ -1322,20 +1323,27 @@ static int lov_enqueue(struct lustre_handle *conn, struct lov_stripe_md *lsm, submd.lsm_stripe_count = 0; /* XXX submd is not fully initialized here */ *flags = 0; - rc = obd_enqueue(&(lov->tgts[loi->loi_ost_idx].conn), &submd, - parent_lock, type, &sub_ext, sizeof(sub_ext), - mode, flags, cb, data, datalen, lov_lockhp); + err = obd_enqueue(&(lov->tgts[loi->loi_ost_idx].conn), &submd, + parent_lock, type, &sub_ext, sizeof(sub_ext), + mode, flags, cb, data, datalen, lov_lockhp); + // XXX add a lock debug statement here - if (rc) + /* return _MATCHED only when all locks matched.. */ + if (err == ELDLM_OK) { + rc = ELDLM_OK; + } else if (err != ELDLM_LOCK_MATCHED) { + rc = err; memset(lov_lockhp, 0, sizeof(*lov_lockhp)); - if (rc && lov->tgts[loi->loi_ost_idx].active) { - CERROR("error: enqueue objid "LPX64" subobj "LPX64 - " on OST idx %d: rc = %d\n", lsm->lsm_object_id, - loi->loi_id, loi->loi_ost_idx, rc); - goto out_locks; + if (lov->tgts[loi->loi_ost_idx].active) { + CERROR("error: enqueue objid "LPX64" subobj " + LPX64" on OST idx %d: rc = %d\n", + lsm->lsm_object_id, loi->loi_id, + loi->loi_ost_idx, rc); + goto out_locks; + } } } - RETURN(0); + RETURN(rc); out_locks: while (loi--, lov_lockhp--, i-- > 0) { @@ -1408,7 +1416,7 @@ static int lov_cancel(struct lustre_handle *conn, struct lov_stripe_md *lsm, lov = &export->exp_obd->u.lov; for (i = 0, loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; - i++, loi++, lov_lockhp++ ) { + i++, loi++, lov_lockhp++) { struct lov_stripe_md submd; int err; diff --git a/lustre/mds/mds_fs.c b/lustre/mds/mds_fs.c index 39e8592..7952101 100644 --- a/lustre/mds/mds_fs.c +++ b/lustre/mds/mds_fs.c @@ -170,6 +170,9 @@ static int mds_read_last_rcvd(struct obd_device *obddev, struct file *f) __u64 last_transno = 0; __u64 last_mount; int rc = 0; + + LASSERT(sizeof(struct mds_client_data) == MDS_LR_SIZE); + LASSERT(sizeof(struct mds_server_data) <= MDS_LR_CLIENT); OBD_ALLOC(msd, sizeof(*msd)); if (!msd) diff --git a/lustre/obdclass/class_obd.c b/lustre/obdclass/class_obd.c index 6200acd..6209d75 100644 --- a/lustre/obdclass/class_obd.c +++ b/lustre/obdclass/class_obd.c @@ -86,12 +86,11 @@ static int obd_class_open(struct inode * inode, struct file * file) struct obd_class_user_state *ocus; ENTRY; - OBD_ALLOC (ocus, sizeof (*ocus)); + OBD_ALLOC(ocus, sizeof(*ocus)); if (ocus == NULL) return (-ENOMEM); - INIT_LIST_HEAD (&ocus->ocus_conns); - ocus->ocus_current_obd = NULL; + INIT_LIST_HEAD(&ocus->ocus_conns); file->private_data = ocus; MOD_INC_USE_COUNT; @@ -209,6 +208,9 @@ int class_handle_ioctl(struct obd_class_user_state *ocus, unsigned int cmd, int err = 0, len = 0, serialised = 0; ENTRY; + if ((cmd & 0xffffff00) == ((int)'T') << 8) /* ignore all tty ioctls */ + RETURN(err = -ENOTTY); + switch (cmd) { case OBD_IOC_BRW_WRITE: case OBD_IOC_BRW_READ: @@ -222,7 +224,8 @@ int class_handle_ioctl(struct obd_class_user_state *ocus, unsigned int cmd, break; } - if (!obd && cmd != OBD_IOC_DEVICE && cmd != TCGETS && + CDEBUG(D_IOCTL, "cmd = %x, obd = %p\n", cmd, obd); + if (!obd && cmd != OBD_IOC_DEVICE && cmd != OBD_IOC_LIST && cmd != OBD_GET_VERSION && cmd != OBD_IOC_NAME2DEV && cmd != OBD_IOC_NEWDEV && cmd != OBD_IOC_ADD_UUID && cmd != OBD_IOC_DEL_UUID && @@ -237,8 +240,6 @@ int class_handle_ioctl(struct obd_class_user_state *ocus, unsigned int cmd, data = (struct obd_ioctl_data *)buf; switch (cmd) { - case TCGETS: - GOTO(out, err=-EINVAL); case OBD_IOC_DEVICE: { CDEBUG(D_IOCTL, "\n"); if (data->ioc_dev >= MAX_OBD_DEVICES || data->ioc_dev < 0) { @@ -266,6 +267,7 @@ int class_handle_ioctl(struct obd_class_user_state *ocus, unsigned int cmd, int l; char *status; struct obd_device *obd = &obd_dev[i]; + if (!obd->obd_type) continue; if (obd->obd_flags & OBD_SET_UP) @@ -663,17 +665,17 @@ int class_handle_ioctl(struct obd_class_user_state *ocus, unsigned int cmd, #define OBD_MINOR 241 #ifdef __KERNEL__ /* to control /dev/obd */ -static int obd_class_ioctl (struct inode * inode, struct file * filp, - unsigned int cmd, unsigned long arg) +static int obd_class_ioctl(struct inode *inode, struct file *filp, + unsigned int cmd, unsigned long arg) { return class_handle_ioctl(filp->private_data, cmd, arg); } /* declare character device */ static struct file_operations obd_psdev_fops = { - ioctl: obd_class_ioctl, /* ioctl */ - open: obd_class_open, /* open */ - release: obd_class_release, /* release */ + ioctl: obd_class_ioctl, /* ioctl */ + open: obd_class_open, /* open */ + release: obd_class_release, /* release */ }; /* modules setup */ @@ -712,6 +714,7 @@ void obd_kmap_get(int count, int server) if (count == 1) atomic_dec(&obd_kmap_count); else while (atomic_add_negative(-count, &obd_kmap_count)) { + struct l_wait_info lwi = { 0 }; static long next_show = 0; static int skipped = 0; @@ -729,8 +732,8 @@ void obd_kmap_get(int count, int server) skipped = 0; } else skipped++; - wait_event(obd_kmap_waitq, - atomic_read(&obd_kmap_count) >= count); + l_wait_event(obd_kmap_waitq, + atomic_read(&obd_kmap_count) >= count, &lwi); } } diff --git a/lustre/obdclass/genops.c b/lustre/obdclass/genops.c index 6fcf504..bd43554 100644 --- a/lustre/obdclass/genops.c +++ b/lustre/obdclass/genops.c @@ -121,7 +121,7 @@ int class_register_type(struct obd_ops *ops, struct lprocfs_vars *vars, type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root, vars, type); - if (type->typ_procroot && IS_ERR(type->typ_procroot)) { + if (IS_ERR(type->typ_procroot)) { rc = PTR_ERR(type->typ_procroot); type->typ_procroot = NULL; list_del(&type->typ_chain); @@ -328,15 +328,14 @@ struct obd_import *class_conn2ldlmimp(struct lustre_handle *conn) struct obd_export *class_new_export(struct obd_device *obddev) { - struct obd_export * export; + struct obd_export *export; - export = kmem_cache_alloc(export_cachep, GFP_KERNEL); + PORTAL_SLAB_ALLOC(export, export_cachep, sizeof(*export)); if (!export) { CERROR("no memory! (minor %d)\n", obddev->obd_minor); return NULL; } - memset(export, 0, sizeof(*export)); get_random_bytes(&export->exp_cookie, sizeof(export->exp_cookie)); export->exp_obd = obddev; /* XXX this should be in LDLM init */ @@ -374,8 +373,7 @@ void class_destroy_export(struct obd_export *exp) ptlrpc_abort_inflight_superhack(&exp->exp_ldlm_data.led_import, 1); - exp->exp_cookie = DEAD_HANDLE_MAGIC; - kmem_cache_free(export_cachep, exp); + PORTAL_SLAB_FREE(exp, export_cachep, sizeof(*exp)); } /* a connection defines an export context in which preallocation can diff --git a/lustre/obdecho/echo_client.c b/lustre/obdecho/echo_client.c index 9f7544b..2239762 100644 --- a/lustre/obdecho/echo_client.c +++ b/lustre/obdecho/echo_client.c @@ -515,7 +515,7 @@ echo_client_kbrw (struct obd_device *obd, int rw, } OBD_FREE(pga, npages * sizeof(*pga)); out_0: - obd_brw_set_free(set); + obd_brw_set_decref(set); return (rc); } @@ -594,7 +594,7 @@ static int echo_client_ubrw(struct obd_device *obd, int rw, out_1: OBD_FREE(pga, npages * sizeof(*pga)); out_0: - obd_brw_set_free(set); + obd_brw_set_decref(set); return (rc); } #else diff --git a/lustre/obdfilter/Makefile.am b/lustre/obdfilter/Makefile.am index 7e17804..4e4e8b1 100644 --- a/lustre/obdfilter/Makefile.am +++ b/lustre/obdfilter/Makefile.am @@ -3,7 +3,6 @@ # This code is issued under the GNU General Public License. # See the file COPYING in this distribution -DEFS = $(ENABLE_OST_RECOVERY) MODULE = obdfilter modulefs_DATA = obdfilter.o EXTRA_PROGRAMS = obdfilter diff --git a/lustre/obdfilter/filter.c b/lustre/obdfilter/filter.c index 8486c22..0632af0 100644 --- a/lustre/obdfilter/filter.c +++ b/lustre/obdfilter/filter.c @@ -296,7 +296,7 @@ int filter_finish_transno(struct obd_export *export, void *handle, #endif if (written == sizeof(*fcd)) RETURN(0); - CERROR("error writing to last_rcvd file: rc = %d\n", written); + CERROR("error writing to last_rcvd file: rc = %d\n", (int)written); if (written >= 0) RETURN(-EIO); @@ -506,7 +506,7 @@ static int filter_init_server_data(struct obd_device *obd, struct file * filp, struct filter_client_data *fcd = NULL; struct inode *inode = filp->f_dentry->d_inode; unsigned long last_rcvd_size = inode->i_size; - __u64 mount_count; + __u64 mount_count = 0; int cl_idx; loff_t off = 0; int rc; @@ -545,8 +545,9 @@ static int filter_init_server_data(struct obd_device *obd, struct file * filp, ssize_t retval = lustre_fread(filp, (char *)fsd, sizeof(*fsd), &off); if (retval != sizeof(*fsd)) { - CDEBUG(D_INODE,"OBD filter: error reading lastobjid\n"); - GOTO(out, rc = -EIO); + CDEBUG(D_INODE,"OBD filter: error reading %s\n", + LAST_RCVD); + GOTO(err_fsd, rc = -EIO); } mount_count = le64_to_cpu(fsd->fsd_mount_count); filter->fo_subdir_count = le16_to_cpu(fsd->fsd_subdir_count); @@ -555,13 +556,13 @@ static int filter_init_server_data(struct obd_device *obd, struct file * filp, if (fsd->fsd_feature_incompat) { CERROR("unsupported feature %x\n", le32_to_cpu(fsd->fsd_feature_incompat)); - RETURN(-EINVAL); + GOTO(err_fsd, rc = -EINVAL); } if (fsd->fsd_feature_rocompat) { CERROR("read-only feature %x\n", le32_to_cpu(fsd->fsd_feature_rocompat)); /* Do something like remount filesystem read-only */ - RETURN(-EINVAL); + GOTO(err_fsd, rc = -EINVAL); } CDEBUG(D_INODE, "%s: server last_objid: "LPU64"\n", @@ -584,86 +585,89 @@ static int filter_init_server_data(struct obd_device *obd, struct file * filp, * the header. If we find clients with higher last_rcvd values * then those clients may need recovery done. */ - if (obd->obd_flags & OBD_REPLAYABLE) { - for (cl_idx = 0; off < last_rcvd_size; cl_idx++) { - __u64 last_rcvd; - int mount_age; - - if (!fcd) { - OBD_ALLOC(fcd, sizeof(*fcd)); - if (!fcd) - GOTO(err_fsd, rc = -ENOMEM); - } + if (!(obd->obd_flags & OBD_REPLAYABLE)) { + CERROR("%s: recovery support OFF\n", obd->obd_name); + GOTO(out, rc = 0); + } - /* Don't assume off is incremented properly, in case - * sizeof(fsd) isn't the same as fsd->fsd_client_size. - */ - off = le32_to_cpu(fsd->fsd_client_start) + - cl_idx * le16_to_cpu(fsd->fsd_client_size); - rc = lustre_fread(filp, (char *)fcd, sizeof(*fcd), &off); - if (rc != sizeof(*fcd)) { - CERROR("error reading FILTER %s offset %d: rc = %d\n", - LAST_RCVD, cl_idx, rc); - if (rc > 0) /* XXX fatal error or just abort reading? */ - rc = -EIO; - break; - } + for (cl_idx = 0; off < last_rcvd_size; cl_idx++) { + __u64 last_rcvd; + int mount_age; - if (fcd->fcd_uuid[0] == '\0') { - CDEBUG(D_INFO, "skipping zeroed client at offset %d\n", - cl_idx); - continue; - } + if (!fcd) { + OBD_ALLOC(fcd, sizeof(*fcd)); + if (!fcd) + GOTO(err_fsd, rc = -ENOMEM); + } - last_rcvd = le64_to_cpu(fcd->fcd_last_rcvd); + /* Don't assume off is incremented properly, in case + * sizeof(fsd) isn't the same as fsd->fsd_client_size. + */ + off = le32_to_cpu(fsd->fsd_client_start) + + cl_idx * le16_to_cpu(fsd->fsd_client_size); + rc = lustre_fread(filp, (char *)fcd, sizeof(*fcd), &off); + if (rc != sizeof(*fcd)) { + CERROR("error reading FILTER %s offset %d: rc = %d\n", + LAST_RCVD, cl_idx, rc); + if (rc > 0) /* XXX fatal error or just abort reading? */ + rc = -EIO; + break; + } - /* These exports are cleaned up by filter_disconnect(), so they - * need to be set up like real exports as filter_connect() does. - */ - mount_age = mount_count - le64_to_cpu(fcd->fcd_mount_count); - if (mount_age < FILTER_MOUNT_RECOV) { - struct obd_export *exp = class_new_export(obd); - struct filter_export_data *fed; - CERROR("RCVRNG CLIENT uuid: %s idx: %d lr: "LPU64 - " srv lr: "LPU64" mnt: "LPU64" last mount: " - LPU64"\n", fcd->fcd_uuid, cl_idx, - last_rcvd, le64_to_cpu(fsd->fsd_last_rcvd), - le64_to_cpu(fcd->fcd_mount_count), mount_count); - /* disabled until OST recovery is actually working */ - - if (!exp) { - rc = -ENOMEM; - break; - } - memcpy(&exp->exp_client_uuid.uuid, fcd->fcd_uuid, - sizeof exp->exp_client_uuid.uuid); - fed = &exp->exp_filter_data; - fed->fed_fcd = fcd; - filter_client_add(filter, fed, cl_idx); - /* create helper if export init gets more complex */ - INIT_LIST_HEAD(&fed->fed_open_head); - spin_lock_init(&fed->fed_lock); - - fcd = NULL; - obd->obd_recoverable_clients++; - } else { - CDEBUG(D_INFO, - "discarded client %d UUID '%s' count "LPU64"\n", - cl_idx, fcd->fcd_uuid, - le64_to_cpu(fcd->fcd_mount_count)); - } + if (fcd->fcd_uuid[0] == '\0') { + CDEBUG(D_INFO, "skipping zeroed client at offset %d\n", + cl_idx); + continue; + } - CDEBUG(D_OTHER, "client at idx %d has last_rcvd = "LPU64"\n", - cl_idx, last_rcvd); + last_rcvd = le64_to_cpu(fcd->fcd_last_rcvd); - if (last_rcvd > le64_to_cpu(filter->fo_fsd->fsd_last_rcvd)) - filter->fo_fsd->fsd_last_rcvd = cpu_to_le64(last_rcvd); + /* These exports are cleaned up by filter_disconnect(), so they + * need to be set up like real exports as filter_connect() does. + */ + mount_age = mount_count - le64_to_cpu(fcd->fcd_mount_count); + if (mount_age < FILTER_MOUNT_RECOV) { + struct obd_export *exp = class_new_export(obd); + struct filter_export_data *fed; + CERROR("RCVRNG CLIENT uuid: %s idx: %d lr: "LPU64 + " srv lr: "LPU64" mnt: "LPU64" last mount: " + LPU64"\n", fcd->fcd_uuid, cl_idx, + last_rcvd, le64_to_cpu(fsd->fsd_last_rcvd), + le64_to_cpu(fcd->fcd_mount_count), mount_count); + /* disabled until OST recovery is actually working */ + + if (!exp) { + rc = -ENOMEM; + break; + } + memcpy(&exp->exp_client_uuid.uuid, fcd->fcd_uuid, + sizeof exp->exp_client_uuid.uuid); + fed = &exp->exp_filter_data; + fed->fed_fcd = fcd; + filter_client_add(filter, fed, cl_idx); + /* create helper if export init gets more complex */ + INIT_LIST_HEAD(&fed->fed_open_head); + spin_lock_init(&fed->fed_lock); + + fcd = NULL; + obd->obd_recoverable_clients++; + } else { + CDEBUG(D_INFO, + "discarded client %d UUID '%s' count "LPU64"\n", + cl_idx, fcd->fcd_uuid, + le64_to_cpu(fcd->fcd_mount_count)); } + CDEBUG(D_OTHER, "client at idx %d has last_rcvd = "LPU64"\n", + cl_idx, last_rcvd); + + if (last_rcvd > le64_to_cpu(filter->fo_fsd->fsd_last_rcvd)) + filter->fo_fsd->fsd_last_rcvd = cpu_to_le64(last_rcvd); + obd->obd_last_committed = le64_to_cpu(filter->fo_fsd->fsd_last_rcvd); if (obd->obd_recoverable_clients) { - CERROR("RECOVERY: %d recoverable clients, last_rcvd "LPU64"\n", - obd->obd_recoverable_clients, + CERROR("RECOVERY: %d recoverable clients, last_rcvd " + LPU64"\n", obd->obd_recoverable_clients, le64_to_cpu(filter->fo_fsd->fsd_last_rcvd)); obd->obd_next_recovery_transno = obd->obd_last_committed + 1; obd->obd_flags |= OBD_RECOVERING; @@ -672,16 +676,14 @@ static int filter_init_server_data(struct obd_device *obd, struct file * filp, if (fcd) OBD_FREE(fcd, sizeof(*fcd)); - } else { - CERROR("%s: recovery support OFF\n", obd->obd_name); } +out: fsd->fsd_mount_count = cpu_to_le64(mount_count + 1); /* save it,so mount count and last_recvd is current */ rc = filter_update_server_data(filp, filter->fo_fsd); -out: RETURN(rc); err_fsd: @@ -768,7 +770,7 @@ static int filter_prep(struct obd_device *obd) if (filter->fo_subdir_count) { O_dentry = filter->fo_dentry_O_mode[S_IFREG >> S_SHIFT]; OBD_ALLOC(filter->fo_dentry_O_sub, - FILTER_SUBDIR_COUNT * sizeof(dentry)); + filter->fo_subdir_count * sizeof(dentry)); if (!filter->fo_dentry_O_sub) GOTO(err_client, rc = -ENOMEM); @@ -1144,17 +1146,14 @@ static int filter_common_setup(struct obd_device *obd, obd_count len, void *buf, mnt = do_kern_mount(data->ioc_inlbuf2, 0, data->ioc_inlbuf1, option); rc = PTR_ERR(mnt); - if (IS_ERR(mnt)) { - CERROR("mount of %s as type %s failed: rc %d\n", - data->ioc_inlbuf2, data->ioc_inlbuf1, rc); + if (IS_ERR(mnt)) GOTO(err_ops, rc); - } #if OST_RECOVERY obd->obd_flags |= OBD_REPLAYABLE; #endif - filter = &obd->u.filter;; + filter = &obd->u.filter; filter->fo_vfsmnt = mnt; filter->fo_fstype = strdup(data->ioc_inlbuf2); filter->fo_sb = mnt->mnt_root->d_inode->i_sb; @@ -1293,8 +1292,15 @@ static int filter_connect(struct lustre_handle *conn, struct obd_device *obd, RETURN(rc); exp = class_conn2export(conn); LASSERT(exp); + fed = &exp->exp_filter_data; + INIT_LIST_HEAD(&exp->exp_filter_data.fed_open_head); + spin_lock_init(&exp->exp_filter_data.fed_lock); + + if (!(obd->obd_flags & OBD_REPLAYABLE)) + RETURN(0); + OBD_ALLOC(fcd, sizeof(*fcd)); if (!fcd) { CERROR("filter: out of memory for client data\n"); @@ -1305,14 +1311,9 @@ static int filter_connect(struct lustre_handle *conn, struct obd_device *obd, fed->fed_fcd = fcd; fcd->fcd_mount_count = cpu_to_le64(filter->fo_fsd->fsd_mount_count); - INIT_LIST_HEAD(&exp->exp_filter_data.fed_open_head); - spin_lock_init(&exp->exp_filter_data.fed_lock); - - if (obd->obd_flags & OBD_REPLAYABLE) { - rc = filter_client_add(filter, fed, -1); - if (rc) - GOTO(out_fcd, rc); - } + rc = filter_client_add(filter, fed, -1); + if (rc) + GOTO(out_fcd, rc); RETURN(rc); @@ -1355,7 +1356,7 @@ static int filter_disconnect(struct lustre_handle *conn) ldlm_cancel_locks_for_export(exp); - if (exp->exp_obd->obd_flags & OBD_REPLAYABLE) + if (exp->exp_obd->obd_flags & OBD_REPLAYABLE) filter_client_free(exp); rc = class_disconnect(conn); @@ -1638,7 +1639,7 @@ static int filter_create(struct lustre_handle *conn, struct obdo *oa, /* This would only happen if lastobjid was bad on disk */ CERROR("objid %s already exists\n", - filter_id(buf, filter, S_IFREG, oa->o_id)); + filter_id(buf, filter, oa->o_mode, oa->o_id)); LBUG(); GOTO(out, rc = -EEXIST); } @@ -1912,14 +1913,6 @@ struct page *filter_get_page_write(struct inode *inode, /* This page is currently locked, so get a temporary page instead. */ - /* XXX I believe this is a very dangerous thing to do - consider if - * we had multiple writers for the same file (definitely the case - * if we are using this codepath). If writer A locks the page, - * writer B writes to a copy (as here), writer A drops the page - * lock, and writer C grabs the lock before B does, then B will - * later overwrite the data from C, even if C had LDLM locked - * and initiated the write after B did. - */ if (!page) { unsigned long addr; CDEBUG(D_ERROR,"ino %lu page %ld locked\n", inode->i_ino,index); @@ -2052,7 +2045,7 @@ static int filter_preprw(int cmd, struct lustre_handle *conn, o->ioo_id), o->ioo_id, 0); - if (IS_ERR(dentry)) + if (IS_ERR(dentry)) GOTO(out_objinfo, rc = PTR_ERR(dentry)); fso[i].fso_dentry = dentry; @@ -2395,6 +2388,7 @@ static int filter_san_preprw(int cmd, struct lustre_handle *conn, for (i = 0; i < objcount; i++, o++) { struct dentry *dentry; struct inode *inode; + int (*fs_bmap)(struct address_space *, long); int j; dentry = filter_fid2dentry(obd, filter_parent(obd, S_IFREG, @@ -2409,15 +2403,15 @@ static int filter_san_preprw(int cmd, struct lustre_handle *conn, f_dput(dentry); GOTO(out, rc = -ENOENT); } + fs_bmap = inode->i_mapping->a_ops->bmap; for (j = 0; j < o->ioo_bufcnt; j++, rnb++) { long block; - block = rnb->offset >> PAGE_SHIFT; + block = rnb->offset >> inode->i_blkbits; if (cmd == OBD_BRW_READ) { - block = inode->i_mapping->a_ops->bmap( - inode->i_mapping, block); + block = fs_bmap(inode->i_mapping, block); } else { loff_t newsize = rnb->offset + rnb->len; /* fs_prep_san_write will also update inode @@ -2496,6 +2490,8 @@ int filter_copy_data(struct lustre_handle *dst_conn, struct obdo *dst, unsigned long index = 0; int err = 0; + LBUG(); /* THIS CODE IS NOT CORRECT -phil */ + memset(&srcmd, 0, sizeof(srcmd)); memset(&dstmd, 0, sizeof(dstmd)); srcmd.lsm_object_id = src->o_id; @@ -2539,7 +2535,7 @@ int filter_copy_data(struct lustre_handle *dst_conn, struct obdo *dst, page->index = index; set->brw_callback = ll_brw_sync_wait; err = obd_brw(OBD_BRW_READ, src_conn, &srcmd, 1, &pg, set,NULL); - obd_brw_set_free(set); + obd_brw_set_decref(set); if (err) { EXIT; break; @@ -2556,7 +2552,7 @@ int filter_copy_data(struct lustre_handle *dst_conn, struct obdo *dst, set->brw_callback = ll_brw_sync_wait; err = obd_brw(OBD_BRW_WRITE, dst_conn, &dstmd, 1, &pg, set,oti); - obd_brw_set_free(set); + obd_brw_set_decref(set); /* XXX should handle dst->o_size, dst->o_blocks here */ if (err) { diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c index ea205a6..515aa70 100644 --- a/lustre/osc/osc_request.c +++ b/lustre/osc/osc_request.c @@ -523,6 +523,8 @@ static void osc_ptl_ev_hdlr(struct ptlrpc_bulk_desc *desc) LASSERT(desc->bd_brw_set != NULL); LASSERT(desc->bd_brw_set->brw_callback != NULL); + /* It's important that you don't use desc->bd_brw_set after this + * callback runs. If you do, take a reference on it. */ desc->bd_brw_set->brw_callback(desc->bd_brw_set, CB_PHASE_FINISH); /* We can't kunmap the desc from interrupt context, so we do it from @@ -547,7 +549,17 @@ static void osc_ptl_ev_abort(struct ptlrpc_bulk_desc *desc) LASSERT(desc->bd_brw_set != NULL); - ptlrpc_abort_bulk(desc); + /* XXX reconcile this with ll_sync_brw_timeout() handling, and/or + * just make osc_ptl_ev_hdlr() check desc->bd_flags for either + * PTL_BULK_FL_RCVD or PTL_BULK_FL_SENT, and pass CB_PHASE_ABORT + * to brw_callback() and do the rest of the cleanup there. I + * also think ll_sync_brw_timeout() is missing an PtlMEUnlink, + * but I could be wrong. + */ + if (ptlrpc_abort_bulk(desc)) { + EXIT; + return; + } obd_brw_set_del(desc); unmap_and_decref_bulk_desc(desc); @@ -565,7 +577,7 @@ static int osc_brw_read(struct lustre_handle *conn, struct lov_stripe_md *lsm, struct ost_body *body; int rc, size[3] = {sizeof(*body)}, mapped = 0; struct obd_ioobj *iooptr; - void *nioptr; + struct niobuf_remote *nioptr; __u32 xid; ENTRY; @@ -589,26 +601,27 @@ restart_bulk: iooptr = lustre_msg_buf(request->rq_reqmsg, 1); nioptr = lustre_msg_buf(request->rq_reqmsg, 2); - ost_pack_ioo(&iooptr, lsm, page_count); + ost_pack_ioo(iooptr, lsm, page_count); /* end almost identical to brw_write case */ xid = ptlrpc_next_xid(); /* single xid for all pages */ obd_kmap_get(page_count, 0); - for (mapped = 0; mapped < page_count; mapped++) { + for (mapped = 0; mapped < page_count; mapped++, nioptr++) { struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc); if (bulk == NULL) { unmap_and_decref_bulk_desc(desc); GOTO(out_req, rc = -ENOMEM); } - bulk->bp_xid = xid; /* single xid for all pages */ + LASSERT(mapped == 0 || pga[mapped].off > pga[mapped - 1].off); + bulk->bp_xid = xid; /* single xid for all pages */ bulk->bp_buf = kmap(pga[mapped].pg); bulk->bp_page = pga[mapped].pg; bulk->bp_buflen = PAGE_SIZE; - ost_pack_niobuf(&nioptr, pga[mapped].off, pga[mapped].count, + ost_pack_niobuf(nioptr, pga[mapped].off, pga[mapped].count, pga[mapped].flag, bulk->bp_xid); } @@ -703,7 +716,7 @@ static int osc_brw_write(struct lustre_handle *conn, struct lov_stripe_md *lsm, struct ost_body *body; int rc, size[3] = {sizeof(*body)}, mapped = 0; struct obd_ioobj *iooptr; - void *nioptr; + struct niobuf_remote *nioptr; __u32 xid; #if CHECKSUM_BULK __u64 cksum = 0; @@ -729,26 +742,29 @@ restart_bulk: iooptr = lustre_msg_buf(request->rq_reqmsg, 1); nioptr = lustre_msg_buf(request->rq_reqmsg, 2); - ost_pack_ioo(&iooptr, lsm, page_count); + ost_pack_ioo(iooptr, lsm, page_count); /* end almost identical to brw_read case */ xid = ptlrpc_next_xid(); /* single xid for all pages */ obd_kmap_get(page_count, 0); - for (mapped = 0; mapped < page_count; mapped++) { + for (mapped = 0; mapped < page_count; mapped++, nioptr++) { struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc); if (bulk == NULL) { unmap_and_decref_bulk_desc(desc); GOTO(out_req, rc = -ENOMEM); } - bulk->bp_xid = xid; /* single xid for all pages */ + LASSERT(mapped == 0 || pga[mapped].off > pga[mapped - 1].off); + bulk->bp_xid = xid; /* single xid for all pages */ bulk->bp_buf = kmap(pga[mapped].pg); bulk->bp_page = pga[mapped].pg; + /* matching ptlrpc_bulk_get assert */ + LASSERT(pga[mapped].count > 0); bulk->bp_buflen = pga[mapped].count; - ost_pack_niobuf(&nioptr, pga[mapped].off, pga[mapped].count, + ost_pack_niobuf(nioptr, pga[mapped].off, pga[mapped].count, pga[mapped].flag, bulk->bp_xid); ost_checksum(&cksum, bulk->bp_buf, bulk->bp_buflen); } @@ -808,6 +824,10 @@ restart_bulk: #define OSC_BRW_MAX_SIZE 65536 #define OSC_BRW_MAX_IOV min_t(int, PTL_MD_MAX_IOV, OSC_BRW_MAX_SIZE/PAGE_SIZE) +#warning "FIXME: make these values dynamic based on a get_info call at setup" +#define OSC_BRW_MAX_SIZE 65536 +#define OSC_BRW_MAX_IOV min_t(int, PTL_MD_MAX_IOV, OSC_BRW_MAX_SIZE/PAGE_SIZE) + static int osc_brw(int cmd, struct lustre_handle *conn, struct lov_stripe_md *md, obd_count page_count, struct brw_page *pga, struct obd_brw_set *set, @@ -843,21 +863,20 @@ static int osc_brw(int cmd, struct lustre_handle *conn, /* Note: caller will lock/unlock, and set uptodate on the pages */ #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0)) static int sanosc_brw_read(struct lustre_handle *conn, - struct lov_stripe_md *md, + struct lov_stripe_md *lsm, obd_count page_count, struct brw_page *pga, struct obd_brw_set *set) { struct ptlrpc_request *request = NULL; struct ost_body *body; - struct niobuf_remote *remote, *nio_rep; - int rc, j, size[3] = {sizeof(*body)}, mapped = 0; + struct niobuf_remote *nioptr; struct obd_ioobj *iooptr; - void *nioptr; + int rc, j, size[3] = {sizeof(*body)}, mapped = 0; ENTRY; size[1] = sizeof(struct obd_ioobj); - size[2] = page_count * sizeof(*remote); + size[2] = page_count * sizeof(*nioptr); request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_SAN_READ, 3, size, NULL); @@ -867,19 +886,20 @@ static int sanosc_brw_read(struct lustre_handle *conn, body = lustre_msg_buf(request->rq_reqmsg, 0); iooptr = lustre_msg_buf(request->rq_reqmsg, 1); nioptr = lustre_msg_buf(request->rq_reqmsg, 2); - ost_pack_ioo(&iooptr, md, page_count); + ost_pack_ioo(iooptr, lsm, page_count); obd_kmap_get(page_count, 0); - for (mapped = 0; mapped < page_count; mapped++) { + for (mapped = 0; mapped < page_count; mapped++, nioptr++) { LASSERT(PageLocked(pga[mapped].pg)); + LASSERT(mapped == 0 || pga[mapped].off > pga[mapped - 1].off); kmap(pga[mapped].pg); - ost_pack_niobuf(&nioptr, pga[mapped].off, pga[mapped].count, + ost_pack_niobuf(nioptr, pga[mapped].off, pga[mapped].count, pga[mapped].flag, 0); } - size[1] = page_count * sizeof(*remote); + size[1] = page_count * sizeof(*nioptr); request->rq_replen = lustre_msg_size(2, size); rc = ptlrpc_queue_wait(request); @@ -896,25 +916,19 @@ static int sanosc_brw_read(struct lustre_handle *conn, GOTO(out_unmap, rc = -EINVAL); } - for (j = 0; j < page_count; j++) { - ost_unpack_niobuf(&nioptr, &remote); - } - - nioptr = lustre_msg_buf(request->rq_repmsg, 1); - nio_rep = (struct niobuf_remote*)nioptr; - /* actual read */ - for (j = 0; j < page_count; j++) { + for (j = 0; j < page_count; j++, nioptr++) { struct page *page = pga[j].pg; struct buffer_head *bh; kdev_t dev; + ost_unpack_niobuf(nioptr, nioptr); /* got san device associated */ LASSERT(class_conn2obd(conn)); dev = class_conn2obd(conn)->u.cli.cl_sandev; /* hole */ - if (!nio_rep[j].offset) { + if (!nioptr->offset) { CDEBUG(D_PAGE, "hole at ino %lu; index %ld\n", page->mapping->host->i_ino, page->index); @@ -928,7 +942,7 @@ static int sanosc_brw_read(struct lustre_handle *conn, clear_bit(BH_New, &bh->b_state); set_bit(BH_Mapped, &bh->b_state); - bh->b_blocknr = (unsigned long)nio_rep[j].offset; + bh->b_blocknr = (unsigned long)nioptr->offset; clear_bit(BH_Uptodate, &bh->b_state); @@ -940,8 +954,7 @@ static int sanosc_brw_read(struct lustre_handle *conn, * one we mapped before, check it */ LASSERT(!test_bit(BH_New, &bh->b_state)); LASSERT(test_bit(BH_Mapped, &bh->b_state)); - LASSERT(bh->b_blocknr == - (unsigned long)nio_rep[j].offset); + LASSERT(bh->b_blocknr == (unsigned long)nioptr->offset); /* wait it's io completion */ if (test_bit(BH_Lock, &bh->b_state)) @@ -976,21 +989,20 @@ out_unmap: } static int sanosc_brw_write(struct lustre_handle *conn, - struct lov_stripe_md *md, + struct lov_stripe_md *lsm, obd_count page_count, struct brw_page *pga, struct obd_brw_set *set) { struct ptlrpc_request *request = NULL; struct ost_body *body; - struct niobuf_remote *remote, *nio_rep; - int rc, j, size[3] = {sizeof(*body)}, mapped = 0; + struct niobuf_remote *nioptr; struct obd_ioobj *iooptr; - void *nioptr; + int rc, j, size[3] = {sizeof(*body)}, mapped = 0; ENTRY; size[1] = sizeof(struct obd_ioobj); - size[2] = page_count * sizeof(*remote); + size[2] = page_count * sizeof(*nioptr); request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_SAN_WRITE, 3, size, NULL); @@ -1000,19 +1012,20 @@ static int sanosc_brw_write(struct lustre_handle *conn, body = lustre_msg_buf(request->rq_reqmsg, 0); iooptr = lustre_msg_buf(request->rq_reqmsg, 1); nioptr = lustre_msg_buf(request->rq_reqmsg, 2); - ost_pack_ioo(&iooptr, md, page_count); + ost_pack_ioo(iooptr, lsm, page_count); /* map pages, and pack request */ obd_kmap_get(page_count, 0); - for (mapped = 0; mapped < page_count; mapped++) { + for (mapped = 0; mapped < page_count; mapped++, nioptr++) { LASSERT(PageLocked(pga[mapped].pg)); + LASSERT(mapped == 0 || pga[mapped].off > pga[mapped - 1].off); kmap(pga[mapped].pg); - ost_pack_niobuf(&nioptr, pga[mapped].off, pga[mapped].count, + ost_pack_niobuf(nioptr, pga[mapped].off, pga[mapped].count, pga[mapped].flag, 0); } - size[1] = page_count * sizeof(*remote); + size[1] = page_count * sizeof(*nioptr); request->rq_replen = lustre_msg_size(2, size); rc = ptlrpc_queue_wait(request); @@ -1029,19 +1042,13 @@ static int sanosc_brw_write(struct lustre_handle *conn, GOTO(out_unmap, rc = -EINVAL); } - for (j = 0; j < page_count; j++) { - ost_unpack_niobuf(&nioptr, &remote); - } - - nioptr = lustre_msg_buf(request->rq_repmsg, 1); - nio_rep = (struct niobuf_remote*)nioptr; - /* actual write */ - for (j = 0; j < page_count; j++) { + for (j = 0; j < page_count; j++, nioptr++) { struct page *page = pga[j].pg; struct buffer_head *bh; kdev_t dev; + ost_unpack_niobuf(nioptr, nioptr); /* got san device associated */ LASSERT(class_conn2obd(conn)); dev = class_conn2obd(conn)->u.cli.cl_sandev; @@ -1053,7 +1060,7 @@ static int sanosc_brw_write(struct lustre_handle *conn, LASSERT(!test_bit(BH_New, &page->buffers->b_state)); LASSERT(test_bit(BH_Mapped, &page->buffers->b_state)); LASSERT(page->buffers->b_blocknr == - (unsigned long)nio_rep[j].offset); + (unsigned long)nioptr->offset); } bh = page->buffers; @@ -1067,7 +1074,7 @@ static int sanosc_brw_write(struct lustre_handle *conn, set_bit(BH_Mapped, &bh->b_state); /* override the block nr */ - bh->b_blocknr = (unsigned long)nio_rep[j].offset; + bh->b_blocknr = (unsigned long)nioptr->offset; /* we are about to write it, so set it * uptodate/dirty @@ -1099,30 +1106,9 @@ out_unmap: goto out_req; } -#else -static int sanosc_brw_read(struct lustre_handle *conn, - struct lov_stripe_md *md, - obd_count page_count, - struct brw_page *pga, - struct obd_brw_set *set) -{ - LBUG(); - return 0; -} - -static int sanosc_brw_write(struct lustre_handle *conn, - struct lov_stripe_md *md, - obd_count page_count, - struct brw_page *pga, - struct obd_brw_set *set) -{ - LBUG(); - return 0; -} -#endif static int sanosc_brw(int cmd, struct lustre_handle *conn, - struct lov_stripe_md *md, obd_count page_count, + struct lov_stripe_md *lsm, obd_count page_count, struct brw_page *pga, struct obd_brw_set *set, struct obd_trans_info *oti) { @@ -1138,10 +1124,10 @@ static int sanosc_brw(int cmd, struct lustre_handle *conn, pages_per_brw = page_count; if (cmd & OBD_BRW_WRITE) - rc = sanosc_brw_write(conn, md, pages_per_brw, + rc = sanosc_brw_write(conn, lsm, pages_per_brw, pga, set); else - rc = sanosc_brw_read(conn, md, pages_per_brw, pga, set); + rc = sanosc_brw_read(conn, lsm, pages_per_brw, pga,set); if (rc != 0) RETURN(rc); @@ -1152,6 +1138,7 @@ static int sanosc_brw(int cmd, struct lustre_handle *conn, RETURN(0); } #endif +#endif static int osc_enqueue(struct lustre_handle *connh, struct lov_stripe_md *lsm, struct lustre_handle *parent_lock, @@ -1178,7 +1165,7 @@ static int osc_enqueue(struct lustre_handle *connh, struct lov_stripe_md *lsm, sizeof(extent), mode, lockh); if (rc == 1) /* We already have a lock, and it's referenced */ - RETURN(ELDLM_OK); + RETURN(ELDLM_LOCK_MATCHED); /* If we're trying to read, we also search for an existing PW lock. The * VFS and page cache already protect us locally, so lots of readers/ @@ -1202,7 +1189,7 @@ static int osc_enqueue(struct lustre_handle *connh, struct lov_stripe_md *lsm, ldlm_lock_addref(lockh, LCK_PR); ldlm_lock_decref(lockh, LCK_PW); - RETURN(ELDLM_OK); + RETURN(ELDLM_LOCK_MATCHED); } } diff --git a/lustre/ost/ost_handler.c b/lustre/ost/ost_handler.c index 18a1b85..848336c 100644 --- a/lustre/ost/ost_handler.c +++ b/lustre/ost/ost_handler.c @@ -249,14 +249,13 @@ static int ost_brw_read(struct ptlrpc_request *req) { struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg; struct ptlrpc_bulk_desc *desc; - struct obd_ioobj *tmp1; - void *tmp2, *end2; struct niobuf_remote *remote_nb; struct niobuf_local *local_nb = NULL; struct obd_ioobj *ioo; struct ost_body *body; struct l_wait_info lwi; void *desc_priv = NULL; + void *end2; int cmd, i, j, objcount, niocount, size = sizeof(*body); int rc = 0; #if CHECKSUM_BULK @@ -265,9 +264,9 @@ static int ost_brw_read(struct ptlrpc_request *req) ENTRY; body = lustre_msg_buf(req->rq_reqmsg, 0); - tmp1 = lustre_msg_buf(req->rq_reqmsg, 1); - tmp2 = lustre_msg_buf(req->rq_reqmsg, 2); - end2 = (char *)tmp2 + req->rq_reqmsg->buflens[2]; + ioo = lustre_msg_buf(req->rq_reqmsg, 1); + remote_nb = lustre_msg_buf(req->rq_reqmsg, 2); + end2 = (char *)remote_nb + req->rq_reqmsg->buflens[2]; objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo); niocount = req->rq_reqmsg->buflens[2] / sizeof(*remote_nb); cmd = OBD_BRW_READ; @@ -282,15 +281,29 @@ static int ost_brw_read(struct ptlrpc_request *req) if (rc) GOTO(out, req->rq_status = rc); - for (i = 0; i < objcount; i++) { - ost_unpack_ioo(&tmp1, &ioo); - if (tmp2 + ioo->ioo_bufcnt > end2) { + for (i = 0; i < objcount; i++, ioo++) { + ost_unpack_ioo(ioo, ioo); + if ((void *)(remote_nb + ioo->ioo_bufcnt) > end2) { + CERROR("BRW: objid "LPX64" count %u larger than %u\n", + ioo->ioo_id, ioo->ioo_bufcnt, + (int)(end2 - (void *)remote_nb)); LBUG(); - GOTO(out, rc = -EFAULT); + GOTO(out, rc = -EINVAL); } - for (j = 0; j < ioo->ioo_bufcnt; j++) { - /* XXX verify niobuf[j].offset > niobuf[j-1].offset */ - ost_unpack_niobuf(&tmp2, &remote_nb); + for (j = 0; j < ioo->ioo_bufcnt; j++, remote_nb++) { + ost_unpack_niobuf(remote_nb, remote_nb); + if (remote_nb->len == 0) { + CERROR("zero len BRW: objid "LPX64" buf %u\n", + ioo->ioo_id, j); + GOTO(out, rc = -EINVAL); + } + if (j && remote_nb->offset <= (remote_nb - 1)->offset) { + CERROR("unordered BRW: objid "LPX64 + " buf %u offset "LPX64" <= "LPX64"\n", + ioo->ioo_id, j, remote_nb->offset, + (remote_nb - 1)->offset); + GOTO(out, rc = -EINVAL); + } } } @@ -298,7 +311,7 @@ static int ost_brw_read(struct ptlrpc_request *req) if (local_nb == NULL) GOTO(out, rc = -ENOMEM); - /* The unpackers move tmp1 and tmp2, so reset them before using */ + /* The unpackers move ioo and remote_nb, so reset them before using */ ioo = lustre_msg_buf(req->rq_reqmsg, 1); remote_nb = lustre_msg_buf(req->rq_reqmsg, 2); req->rq_status = obd_preprw(cmd, conn, objcount, ioo, niocount, @@ -363,9 +376,8 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti) { struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg; struct ptlrpc_bulk_desc *desc; - struct obd_ioobj *tmp1; - void *tmp2, *end2; struct niobuf_remote *remote_nb; + void *end2; struct niobuf_local *local_nb = NULL; struct obd_ioobj *ioo; struct ost_body *body; @@ -376,9 +388,9 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti) ENTRY; body = lustre_msg_buf(req->rq_reqmsg, 0); - tmp1 = lustre_msg_buf(req->rq_reqmsg, 1); - tmp2 = lustre_msg_buf(req->rq_reqmsg, 2); - end2 = (char *)tmp2 + req->rq_reqmsg->buflens[2]; + ioo = lustre_msg_buf(req->rq_reqmsg, 1); + remote_nb = lustre_msg_buf(req->rq_reqmsg, 2); + end2 = (void *)remote_nb + req->rq_reqmsg->buflens[2]; objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo); niocount = req->rq_reqmsg->buflens[2] / sizeof(*remote_nb); cmd = OBD_BRW_WRITE; @@ -386,15 +398,29 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti) if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_WRITE_BULK)) GOTO(out, req->rq_status = -EIO); - for (i = 0; i < objcount; i++) { - ost_unpack_ioo(&tmp1, &ioo); - if (tmp2 + ioo->ioo_bufcnt > end2) { + for (i = 0; i < objcount; i++, ioo++) { + ost_unpack_ioo(ioo, ioo); + if ((void *)(remote_nb + ioo->ioo_bufcnt) > end2) { + CERROR("BRW: objid "LPX64" count %u larger than %u\n", + ioo->ioo_id, ioo->ioo_bufcnt, + (int)(end2 - (void *)remote_nb)); LBUG(); - GOTO(out, rc = -EFAULT); + GOTO(out, rc = -EINVAL); } - for (j = 0; j < ioo->ioo_bufcnt; j++) { - /* XXX verify niobuf[j].offset > niobuf[j-1].offset */ - ost_unpack_niobuf(&tmp2, &remote_nb); + for (j = 0; j < ioo->ioo_bufcnt; j++, remote_nb++) { + ost_unpack_niobuf(remote_nb, remote_nb); + if (remote_nb->len == 0) { + CERROR("zero len BRW: objid "LPX64" buf %u\n", + ioo->ioo_id, j); + GOTO(out, rc = -EINVAL); + } + if (j && remote_nb->offset <= (remote_nb - 1)->offset) { + CERROR("unordered BRW: objid "LPX64 + " buf %u offset "LPX64" <= "LPX64"\n", + ioo->ioo_id, j, remote_nb->offset, + (remote_nb - 1)->offset); + GOTO(out, rc = -EINVAL); + } } } @@ -402,9 +428,10 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti) if (local_nb == NULL) GOTO(out, rc = -ENOMEM); - /* The unpackers move tmp1 and tmp2, so reset them before using */ + /* The unpackers move ioo and remote_nb, so reset them before using */ ioo = lustre_msg_buf(req->rq_reqmsg, 1); remote_nb = lustre_msg_buf(req->rq_reqmsg, 2); + req->rq_status = obd_preprw(cmd, conn, objcount, ioo, niocount, remote_nb, local_nb, &desc_priv, oti); @@ -505,26 +532,28 @@ static int ost_san_brw(struct ptlrpc_request *req, int alloc) struct obd_ioobj *ioo; struct ost_body *body; int cmd, rc, i, j, objcount, niocount, size[2] = {sizeof(*body)}; - void *tmp1, *tmp2, *end2; + void *end2; ENTRY; body = lustre_msg_buf(req->rq_reqmsg, 0); - tmp1 = lustre_msg_buf(req->rq_reqmsg, 1); - tmp2 = lustre_msg_buf(req->rq_reqmsg, 2); - end2 = (char *)tmp2 + req->rq_reqmsg->buflens[2]; + ioo = lustre_msg_buf(req->rq_reqmsg, 1); + remote_nb = lustre_msg_buf(req->rq_reqmsg, 2); + end2 = (void *)remote_nb + req->rq_reqmsg->buflens[2]; objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo); niocount = req->rq_reqmsg->buflens[2] / sizeof(*remote_nb); - + cmd = alloc ? OBD_BRW_WRITE : OBD_BRW_READ; - for (i = 0; i < objcount; i++) { - ost_unpack_ioo((void *)&tmp1, &ioo); - if (tmp2 + ioo->ioo_bufcnt > end2) { - rc = -EFAULT; - break; + for (i = 0; i < objcount; i++, ioo++) { + ost_unpack_ioo(ioo, ioo); + if ((void *)(remote_nb + ioo->ioo_bufcnt) > end2) { + CERROR("BRW: objid "LPX64" count %u larger than %u\n", + ioo->ioo_id, ioo->ioo_bufcnt, + (int)(end2 - (void *)remote_nb)); + GOTO(out, rc = -EINVAL); } - for (j = 0; j < ioo->ioo_bufcnt; j++) - ost_unpack_niobuf((void *)&tmp2, &remote_nb); + for (j = 0; j < ioo->ioo_bufcnt; j++, remote_nb++) + ost_unpack_niobuf(remote_nb, remote_nb); } size[1] = niocount * sizeof(*remote_nb); @@ -532,12 +561,12 @@ static int ost_san_brw(struct ptlrpc_request *req, int alloc) if (rc) GOTO(out, rc); - /* The unpackers move tmp1 and tmp2, so reset them before using */ - tmp1 = lustre_msg_buf(req->rq_reqmsg, 1); - tmp2 = lustre_msg_buf(req->rq_reqmsg, 2); + /* The unpackers move ioo and remote_nb, so reset them before using */ + ioo = lustre_msg_buf(req->rq_reqmsg, 1); + remote_nb = lustre_msg_buf(req->rq_reqmsg, 2); - req->rq_status = obd_san_preprw(cmd, conn, objcount, tmp1, - niocount, tmp2); + req->rq_status = obd_san_preprw(cmd, conn, objcount, ioo, + niocount, remote_nb); if (req->rq_status) { rc = 0; @@ -546,15 +575,9 @@ static int ost_san_brw(struct ptlrpc_request *req, int alloc) remote_nb = lustre_msg_buf(req->rq_repmsg, 1); res_nb = lustre_msg_buf(req->rq_reqmsg, 2); - for (i = 0; i < niocount; i++) { - /* this advances remote_nb */ - ost_pack_niobuf((void **)&remote_nb, - res_nb[i].offset, - res_nb[i].len, /* 0 */ - res_nb[i].flags, /* 0 */ - res_nb[i].xid - ); - } + for (i = 0; i < niocount; i++, remote_nb++, res_nb++) + ost_pack_niobuf(remote_nb, res_nb->offset, res_nb->len, + res_nb->flags, res_nb->xid); rc = 0; diff --git a/lustre/ptlbd/blk.c b/lustre/ptlbd/blk.c index 70ea9e4..28ca368 100644 --- a/lustre/ptlbd/blk.c +++ b/lustre/ptlbd/blk.c @@ -191,6 +191,7 @@ static void ptlbd_request(request_queue_t *q) spin_unlock_irq(&io_request_lock); /* XXX dunno if we're supposed to get this or not.. */ + /* __make_request() changes READA to READ - Kris */ LASSERT(req->cmd != READA); if ( req->cmd == READ ) @@ -198,7 +199,7 @@ static void ptlbd_request(request_queue_t *q) else cmd = PTLBD_WRITE; - ptlbd_send_req(ptlbd, cmd, req->bh); + ptlbd_send_req(ptlbd, cmd, req); spin_lock_irq(&io_request_lock); @@ -234,7 +235,8 @@ int ptlbd_blk_init(void) for ( i = 0 ; i < PTLBD_MAX_MINOR ; i++) { ptlbd_size_size[i] = 4096; - ptlbd_size[i] = (4096*2048) >> BLOCK_SIZE_BITS; + /* avoid integer overflow */ + ptlbd_size[i] = (16*1024*((1024*1024) >> BLOCK_SIZE_BITS)); ptlbd_hardsect_size[i] = 4096; ptlbd_max_sectors[i] = 2; //RHism ptlbd_dev_varyio[i] = 0; @@ -246,12 +248,9 @@ int ptlbd_blk_init(void) void ptlbd_blk_exit(void) { - int ret; ENTRY; blk_cleanup_queue(BLK_DEFAULT_QUEUE(PTLBD_MAJOR)); - ret = unregister_blkdev(PTLBD_MAJOR, "ptlbd"); - if ( ret ) /* XXX */ - printk("unregister_blkdev() failed: %d\n", ret); + unregister_blkdev(PTLBD_MAJOR, "ptlbd"); } #undef MAJOR_NR diff --git a/lustre/ptlbd/client.c b/lustre/ptlbd/client.c index 67d0b85..8d957db 100644 --- a/lustre/ptlbd/client.c +++ b/lustre/ptlbd/client.c @@ -83,10 +83,17 @@ static int ptlbd_cl_setup(struct obd_device *obddev, obd_count len, void *buf) static int ptlbd_cl_cleanup(struct obd_device *obddev) { -// struct ptlbd_obd *ptlbd = &obddev->u.ptlbd; + struct ptlbd_obd *ptlbd = &obddev->u.ptlbd; ENTRY; - CERROR("I should be cleaning things up\n"); + if (!ptlbd) + RETURN(-ENOENT); + + if (!ptlbd->bd_import.imp_connection) + RETURN(-ENOENT); + + ptlrpc_cleanup_client(&ptlbd->bd_import); + ptlrpc_put_connection(ptlbd->bd_import.imp_connection); RETURN(0); } diff --git a/lustre/ptlbd/main.c b/lustre/ptlbd/main.c index a95cc3f..e3fde99 100644 --- a/lustre/ptlbd/main.c +++ b/lustre/ptlbd/main.c @@ -62,6 +62,7 @@ static void __exit ptlbd_exit(void) ENTRY; ptlbd_cl_exit(); ptlbd_sv_exit(); + ptlbd_blk_exit(); EXIT; } diff --git a/lustre/ptlbd/rpc.c b/lustre/ptlbd/rpc.c index 4daee83..d3e5083 100644 --- a/lustre/ptlbd/rpc.c +++ b/lustre/ptlbd/rpc.c @@ -32,9 +32,14 @@ #include #include +#define RSP_OK 0 +#define RSP_NOTOK -1 +#define RQ_OK 0 + int ptlbd_send_req(struct ptlbd_obd *ptlbd, ptlbd_cmd_t cmd, - struct buffer_head *first_bh) + struct request *blkreq) { + struct buffer_head *first_bh = blkreq->bh; struct obd_import *imp = &ptlbd->bd_import; struct ptlbd_op *op; struct ptlbd_niob *niob, *niobs; @@ -108,9 +113,13 @@ int ptlbd_send_req(struct ptlbd_obd *ptlbd, ptlbd_cmd_t cmd, req->rq_level = imp->imp_level; rc = ptlrpc_queue_wait(req); - if ( rc == 0 ) { - rsp = lustre_msg_buf(req->rq_repmsg, 0); - /* XXX do stuff */ + if ( rc != 0 ) { + blkreq->errors++; + GOTO(out_desc, rc); + } + rsp = lustre_msg_buf(req->rq_repmsg, 0); + if (rsp->r_status != RSP_OK) { + blkreq->errors += rsp->r_error_cnt; } out_desc: @@ -130,11 +139,12 @@ static int ptlbd_bulk_timeout(void *data) RETURN(1); } -void ptlbd_do_filp(struct file *filp, int op, struct ptlbd_niob *niobs, +int ptlbd_do_filp(struct file *filp, int op, struct ptlbd_niob *niobs, int page_count, struct list_head *page_list) { mm_segment_t old_fs; struct list_head *pos; + int status = RSP_OK; ENTRY; old_fs = get_fs(); @@ -145,19 +155,23 @@ void ptlbd_do_filp(struct file *filp, int op, struct ptlbd_niob *niobs, struct page *page = list_entry(pos, struct page, list); loff_t offset = (niobs->n_block_nr << PAGE_SHIFT) + niobs->n_offset; - - if ( op == PTLBD_READ ) - ret = filp->f_op->read(filp, page_address(page), - niobs->n_length, &offset); - else - ret = filp->f_op->write(filp, page_address(page), - niobs->n_length, &offset); + if ( op == PTLBD_READ ) { + if ((ret = filp->f_op->read(filp, page_address(page), + niobs->n_length, &offset)) != niobs->n_length) + status = ret; + goto out; + } else { + if ((ret = filp->f_op->write(filp, page_address(page), + niobs->n_length, &offset)) != niobs->n_length) + status = ret; + goto out; + } niobs++; } - +out: set_fs(old_fs); - EXIT; + RETURN(status); } int ptlbd_parse_req(struct ptlrpc_request *req) @@ -168,7 +182,8 @@ int ptlbd_parse_req(struct ptlrpc_request *req) struct ptlrpc_bulk_desc *desc; struct file *filp = req->rq_obd->u.ptlbd.filp; struct l_wait_info lwi; - int size[1], wait_flag, i, page_count, rc; + int size[1], wait_flag, i, page_count, rc, error_cnt = 0, + status = RSP_OK; struct list_head *pos, *n; LIST_HEAD(tmp_pages); ENTRY; @@ -199,16 +214,16 @@ int ptlbd_parse_req(struct ptlrpc_request *req) GOTO(out_bulk, rc = -ENOMEM); list_add(&bulk->bp_page->list, &tmp_pages); - /* - * XXX what about the block number? - */ bulk->bp_xid = niob->n_xid; bulk->bp_buf = page_address(bulk->bp_page); bulk->bp_buflen = niob->n_length; } if ( op->op_cmd == PTLBD_READ ) { - ptlbd_do_filp(filp, PTLBD_READ, niobs, page_count, &tmp_pages); + if ((status = ptlbd_do_filp(filp, PTLBD_READ, niobs, + page_count, &tmp_pages)) < 0) { + error_cnt++; + } rc = ptlrpc_bulk_put(desc); wait_flag = PTL_BULK_FL_SENT; } else { @@ -232,12 +247,17 @@ int ptlbd_parse_req(struct ptlrpc_request *req) if ( rsp == NULL ) GOTO(out, rc = -EINVAL); - ptlbd_do_filp(filp, PTLBD_WRITE, niobs, page_count, &tmp_pages); + if ( op->op_cmd == PTLBD_WRITE ) { + if ((status = ptlbd_do_filp(filp, PTLBD_WRITE, niobs, + page_count, &tmp_pages)) < 0) { + error_cnt++; + } + } - rsp->r_error_cnt = 42; - rsp->r_status = 69; + rsp->r_error_cnt = error_cnt; + rsp->r_status = status; /* I/O status */ + req->rq_status = RQ_OK ; /* XXX */ /* ptlbd req status */ - req->rq_status = 0; /* XXX */ ptlrpc_reply(req->rq_svc, req); out_bulk: diff --git a/lustre/ptlbd/server.c b/lustre/ptlbd/server.c index 793354d..e4a7046 100644 --- a/lustre/ptlbd/server.c +++ b/lustre/ptlbd/server.c @@ -32,6 +32,8 @@ #include #include +#define BACKING_FILE "/tmp/ptlbd-backing-file-la-la-la" + static int ptlbd_sv_already_setup = 1; static int ptlbd_sv_setup(struct obd_device *obddev, obd_count len, void *buf) @@ -40,8 +42,9 @@ static int ptlbd_sv_setup(struct obd_device *obddev, obd_count len, void *buf) int rc; ENTRY; - ptlbd->filp = filp_open("/tmp/ptlbd-backing-file-la-la-la", - O_RDWR|O_CREAT, 0600); + ptlbd->filp = filp_open(BACKING_FILE, + O_RDWR|O_CREAT|O_LARGEFILE, 0600); + if ( IS_ERR(ptlbd->filp) ) RETURN(PTR_ERR(ptlbd->filp)); diff --git a/lustre/ptlrpc/client.c b/lustre/ptlrpc/client.c index 7d80d5f..998c462 100644 --- a/lustre/ptlrpc/client.c +++ b/lustre/ptlrpc/client.c @@ -243,12 +243,13 @@ int ll_brw_sync_wait(struct obd_brw_set *set, int phase) int rc = 0; ENTRY; + obd_brw_set_addref(set); switch(phase) { case CB_PHASE_START: lwi = LWI_TIMEOUT_INTR(obd_timeout * HZ, ll_sync_brw_timeout, ll_sync_brw_intr, set); rc = l_wait_event(set->brw_waitq, - atomic_read(&set->brw_refcount) == 0, &lwi); + atomic_read(&set->brw_desc_count) == 0, &lwi); list_for_each_safe(tmp, next, &set->brw_desc_head) { struct ptlrpc_bulk_desc *desc = @@ -259,12 +260,13 @@ int ll_brw_sync_wait(struct obd_brw_set *set, int phase) } break; case CB_PHASE_FINISH: - if (atomic_dec_and_test(&set->brw_refcount)) + if (atomic_dec_and_test(&set->brw_desc_count)) wake_up(&set->brw_waitq); break; default: LBUG(); } + obd_brw_set_decref(set); RETURN(rc); } @@ -294,6 +296,7 @@ struct ptlrpc_request *ptlrpc_prep_req(struct obd_import *imp, int opcode, RETURN(NULL); } + request->rq_timeout = obd_timeout; request->rq_level = LUSTRE_CONN_FULL; request->rq_type = PTL_RPC_MSG_REQUEST; request->rq_import = imp; @@ -730,7 +733,7 @@ int ptlrpc_queue_wait(struct ptlrpc_request *req) interrupted_request, req); } else { DEBUG_REQ(D_NET, req, "-- sleeping"); - lwi = LWI_TIMEOUT_INTR(obd_timeout * HZ, expired_request, + lwi = LWI_TIMEOUT_INTR(req->rq_timeout * HZ, expired_request, interrupted_request, req); } #ifdef __KERNEL__ @@ -816,9 +819,11 @@ int ptlrpc_queue_wait(struct ptlrpc_request *req) } imp->imp_level = LUSTRE_CONN_RECOVD; spin_unlock_irqrestore(&imp->imp_lock, flags); - rc = imp->imp_recover(imp, PTLRPC_RECOVD_PHASE_NOTCONN); - if (rc) - LBUG(); + if (imp->imp_recover != NULL) { + rc = imp->imp_recover(imp, PTLRPC_RECOVD_PHASE_NOTCONN); + if (rc) + LBUG(); + } GOTO(out, rc = -EIO); } @@ -917,6 +922,7 @@ void ptlrpc_abort_inflight(struct obd_import *imp, int dying_import) { unsigned long flags; struct list_head *tmp, *n; + ENTRY; /* Make sure that no new requests get processed for this import. * ptlrpc_queue_wait must (and does) hold imp_lock while testing this @@ -949,4 +955,5 @@ void ptlrpc_abort_inflight(struct obd_import *imp, int dying_import) req->rq_import = NULL; wake_up(&req->rq_wait_for_rep); } + EXIT; } diff --git a/lustre/ptlrpc/niobuf.c b/lustre/ptlrpc/niobuf.c index 62a76c4..3b1d32f 100644 --- a/lustre/ptlrpc/niobuf.c +++ b/lustre/ptlrpc/niobuf.c @@ -401,12 +401,22 @@ int ptlrpc_register_bulk_put(struct ptlrpc_bulk_desc *desc) int ptlrpc_abort_bulk(struct ptlrpc_bulk_desc *desc) { + int rc1, rc2; /* This should be safe: these handles are initialized to be * invalid in ptlrpc_prep_bulk() */ - PtlMDUnlink(desc->bd_md_h); - PtlMEUnlink(desc->bd_me_h); + rc1 = PtlMDUnlink(desc->bd_md_h); + if (rc1 != PTL_OK) + CERROR("PtlMDUnlink: %d\n", rc1); + rc2 = PtlMEUnlink(desc->bd_me_h); + if (rc2 != PTL_OK) + CERROR("PtlMEUnlink: %d\n", rc2); + + return rc1 ? rc1 : rc2; +} - return 0; +void obd_brw_set_addref(struct obd_brw_set *set) +{ + atomic_inc(&set->brw_refcount); } void obd_brw_set_add(struct obd_brw_set *set, struct ptlrpc_bulk_desc *desc) @@ -414,14 +424,14 @@ void obd_brw_set_add(struct obd_brw_set *set, struct ptlrpc_bulk_desc *desc) LASSERT(list_empty(&desc->bd_set_chain)); ptlrpc_bulk_addref(desc); - atomic_inc(&set->brw_refcount); + atomic_inc(&set->brw_desc_count); desc->bd_brw_set = set; list_add(&desc->bd_set_chain, &set->brw_desc_head); } void obd_brw_set_del(struct ptlrpc_bulk_desc *desc) { - atomic_dec(&desc->bd_brw_set->brw_refcount); + atomic_dec(&desc->bd_brw_set->brw_desc_count); list_del_init(&desc->bd_set_chain); ptlrpc_bulk_decref(desc); } @@ -435,13 +445,14 @@ struct obd_brw_set *obd_brw_set_new(void) if (set != NULL) { init_waitqueue_head(&set->brw_waitq); INIT_LIST_HEAD(&set->brw_desc_head); - atomic_set(&set->brw_refcount, 0); + atomic_set(&set->brw_refcount, 1); + atomic_set(&set->brw_desc_count, 0); } return set; } -void obd_brw_set_free(struct obd_brw_set *set) +static void obd_brw_set_free(struct obd_brw_set *set) { struct list_head *tmp, *next; ENTRY; @@ -459,6 +470,14 @@ void obd_brw_set_free(struct obd_brw_set *set) return; } +void obd_brw_set_decref(struct obd_brw_set *set) +{ + ENTRY; + if (atomic_dec_and_test(&set->brw_refcount)) + obd_brw_set_free(set); + EXIT; +} + int ptlrpc_reply(struct ptlrpc_service *svc, struct ptlrpc_request *req) { if (req->rq_repmsg == NULL) { diff --git a/lustre/ptlrpc/rpc.c b/lustre/ptlrpc/rpc.c index 0f13acf..c0d5ba5 100644 --- a/lustre/ptlrpc/rpc.c +++ b/lustre/ptlrpc/rpc.c @@ -255,10 +255,11 @@ EXPORT_SYMBOL(ptlrpc_error); EXPORT_SYMBOL(ptlrpc_resend_req); EXPORT_SYMBOL(ptl_send_rpc); EXPORT_SYMBOL(ptlrpc_link_svc_me); -EXPORT_SYMBOL(obd_brw_set_free); EXPORT_SYMBOL(obd_brw_set_new); EXPORT_SYMBOL(obd_brw_set_add); EXPORT_SYMBOL(obd_brw_set_del); +EXPORT_SYMBOL(obd_brw_set_decref); +EXPORT_SYMBOL(obd_brw_set_addref); /* client.c */ EXPORT_SYMBOL(ptlrpc_init_client); diff --git a/lustre/ptlrpc/service.c b/lustre/ptlrpc/service.c index 112d01d..3338445 100644 --- a/lustre/ptlrpc/service.c +++ b/lustre/ptlrpc/service.c @@ -216,7 +216,7 @@ static int handle_incoming_request(struct obd_device *obddev, } CDEBUG(D_RPCTRACE, "Handling RPC ni:pid:xid:nid:opc %d:%d:"LPU64":" - LPX64":%d\n", rqbd->rqbd_srv_ni - &svc->srv_interfaces[0], + LPX64":%d\n", (int)(rqbd->rqbd_srv_ni - svc->srv_interfaces), NTOH__u32(request->rq_reqmsg->status), request->rq_xid, event->initiator.nid, NTOH__u32(request->rq_reqmsg->opc)); @@ -334,8 +334,9 @@ static int ptlrpc_main(void *arg) /* And now, loop forever on requests */ while (1) { - wait_event(svc->srv_waitq, - ptlrpc_check_event(svc, thread, event)); + struct l_wait_info lwi = { 0 }; + l_wait_event(svc->srv_waitq, + ptlrpc_check_event(svc, thread, event), &lwi); if (thread->t_flags & SVC_STOPPING) { spin_lock(&svc->srv_lock); @@ -377,12 +378,15 @@ out: static void ptlrpc_stop_thread(struct ptlrpc_service *svc, struct ptlrpc_thread *thread) { + struct l_wait_info lwi = { 0 }; + spin_lock(&svc->srv_lock); thread->t_flags = SVC_STOPPING; spin_unlock(&svc->srv_lock); wake_up(&svc->srv_waitq); - wait_event(thread->t_ctl_waitq, (thread->t_flags & SVC_STOPPED)); + l_wait_event(thread->t_ctl_waitq, (thread->t_flags & SVC_STOPPED), + &lwi); } void ptlrpc_stop_all_threads(struct ptlrpc_service *svc) @@ -404,6 +408,7 @@ void ptlrpc_stop_all_threads(struct ptlrpc_service *svc) int ptlrpc_start_thread(struct obd_device *dev, struct ptlrpc_service *svc, char *name) { + struct l_wait_info lwi = { 0 }; struct ptlrpc_svc_data d; struct ptlrpc_thread *thread; int rc; @@ -434,7 +439,7 @@ int ptlrpc_start_thread(struct obd_device *dev, struct ptlrpc_service *svc, OBD_FREE(thread, sizeof(*thread)); RETURN(rc); } - wait_event(thread->t_ctl_waitq, thread->t_flags & SVC_RUNNING); + l_wait_event(thread->t_ctl_waitq, thread->t_flags & SVC_RUNNING, &lwi); RETURN(0); } diff --git a/lustre/scripts/lustre.spec.in b/lustre/scripts/lustre.spec.in index 6b40c41..3657c7a 100644 --- a/lustre/scripts/lustre.spec.in +++ b/lustre/scripts/lustre.spec.in @@ -71,16 +71,21 @@ rm -rf $RPM_BUILD_ROOT cd $RPM_BUILD_DIR/lustre-%{version} ./configure --with-linux='%{linuxdir}' --with-portals='%{portalsdir}' --with-portalslib='%{portalslibdir}' make + +%ifarch i386 cd $RPM_BUILD_DIR/lustre-%{version}-lib/lustre-%{version} ./configure --with-lib --with-portals='%{portalsdir}' --with-portalslib='%{portalslibdir}' make +%endif %install cd $RPM_BUILD_DIR/lustre-%{version} make install prefix=$RPM_BUILD_ROOT +%ifarch i386 cd $RPM_BUILD_DIR/lustre-%{version}-lib/lustre-%{version} make install prefix=$RPM_BUILD_ROOT +%endif # Create the pristine source directory. @@ -140,6 +145,7 @@ mkdir -p $RPM_BUILD_ROOT/var/lib/ldap/lustre %files -n lustre-source %attr(-, root, root) /usr/src/lustre-%{version} +%ifarch i386 %files -n liblustre %attr(-, root, root) /lib/lustre %attr(-, root, root) /lib/lustre/liblov.a @@ -158,6 +164,7 @@ mkdir -p $RPM_BUILD_ROOT/var/lib/ldap/lustre %attr(-, root, root) /usr/sbin/lconf %attr(-, root, root) /usr/sbin/lmc %attr(-, root, root) /usr/sbin/llanalyze +%endif %files -n lustre-ldap diff --git a/lustre/tests/ba-echo.sh b/lustre/tests/ba-echo.sh index 6d7b7f4..c0427fd 100644 --- a/lustre/tests/ba-echo.sh +++ b/lustre/tests/ba-echo.sh @@ -2,13 +2,13 @@ config=${1:-ba-echo.xml} +LMC_REAL="${LMC:-../utils/lmc} -m $config" LMC="save_cmd" -LMC_REAL="../../lustre/utils/lmc -m $config" TCPBUF=1048576 OST=${OST:-ba-ost-1} -CLIENT=client - +CLIENT=`hostname` + UUIDLIST=${UUIDLIST:-/usr/local/admin/ba-ost/UUID.txt} h2tcp () { @@ -29,7 +29,7 @@ OST_UUID=`awk "/$OST / { print \\$3 }" $UUIDLIST` # server node ${LMC} --add net --node $OST --tcpbuf $TCPBUF --nid $OST --nettype tcp -${LMC} --add ost --node $OST --ost ost1 --obdtype=obdecho $OST_UUID +${LMC} --add ost --node $OST --ost ost1 --osdtype=obdecho $OST_UUID # osc on client ${LMC} --add echo_client --node $CLIENT --ost ost1 diff --git a/lustre/tests/directio.c b/lustre/tests/directio.c index 3299011..f529fb0 100644 --- a/lustre/tests/directio.c +++ b/lustre/tests/directio.c @@ -18,6 +18,7 @@ int main(int argc, char **argv) int fd; char *buf; int blocks; + long len; struct stat st; int rc; @@ -41,15 +42,16 @@ int main(int argc, char **argv) printf("directio on %s for %dx%lu blocks \n", argv[1], blocks, st.st_blksize); - buf = mmap(0, blocks * st.st_blksize, PROT_READ|PROT_WRITE, - MAP_PRIVATE|MAP_ANON, 0, 0); + len = blocks * st.st_blksize; + buf = mmap(0, len, PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_ANON, 0, 0); if (!buf) { printf("No memory %s\n", strerror(errno)); return 1; } - rc = write(fd, buf, blocks * st.st_blksize); - if (rc != blocks * st.st_blksize) { + memset(buf, 0xba, len); + rc = write(fd, buf, len); + if (rc != len) { printf("Write error %s (rc = %d)\n", strerror(errno), rc); return 1; } @@ -59,8 +61,8 @@ int main(int argc, char **argv) return 1; } - rc = read(fd, buf, blocks * st.st_blksize); - if (rc != blocks * st.st_blksize) { + rc = read(fd, buf, len); + if (rc != len) { printf("Read error: %s (rc = %d)\n", strerror(errno), rc); return 1; } diff --git a/lustre/tests/lkcdmap b/lustre/tests/lkcdmap index 38e79c8..20c8c20 100755 --- a/lustre/tests/lkcdmap +++ b/lustre/tests/lkcdmap @@ -3,13 +3,11 @@ TMP=${TMP:-/tmp} LCMD=$TMP/lkcd-cmds-`hostname` echo "Storing LKCD module info in $LCMD" cat /tmp/ogdb-`hostname` | while read JUNK M JUNK; do - DIR=`dirname $M` - DIR=`cd $PWD/../$DIR; pwd` - MOD="$DIR/`basename $M`" + MOD="../$M" MAP=`echo $MOD | sed -e 's/\.o$/.map/'` - MODNAME=`basename $M | sed -e 's/\.o$//'` + MODNAME=`basename $MOD | sed -e 's/\.o$//'` nm $MOD > $MAP - echo namelist -a $MOD | tee -a $LCMD - echo symtab -a $MAP $MODNAME | tee -a $LCMD + echo namelist -a $PWD/$MOD | tee -a $LCMD + echo symtab -a $PWD/$MAP $MODNAME | tee -a $LCMD done diff --git a/lustre/tests/llechocleanup.sh b/lustre/tests/llechocleanup.sh index cd87766..22d7550 100755 --- a/lustre/tests/llechocleanup.sh +++ b/lustre/tests/llechocleanup.sh @@ -11,5 +11,5 @@ if [ ! -f $config ]; then sh $mkconfig $config || exit 1 fi -${LCONF} --cleanup echo.xml +${LCONF} --cleanup $NAME.xml diff --git a/lustre/tests/openclose.c b/lustre/tests/openclose.c index cc4b06d..1294b13 100644 --- a/lustre/tests/openclose.c +++ b/lustre/tests/openclose.c @@ -62,9 +62,8 @@ int main(int argc, char *argv[]) pid_t ret; ret = waitpid(0, &status, 0); - if (ret == 0) { + if (ret == 0) continue; - } if (ret < 0) { fprintf(stderr, "error: %s: wait - %s\n", @@ -85,9 +84,8 @@ int main(int argc, char *argv[]) argv[0], ret, err); if (!rc) rc = err; - - live_threads--; } + live_threads--; } } else { if (threads) @@ -115,7 +113,8 @@ int main(int argc, char *argv[]) rc = errno; break; } - if (ioctl(fd, LL_IOC_SETFLAGS, &ioctl_flags) < 0) { + if (ioctl(fd, LL_IOC_SETFLAGS, &ioctl_flags) < 0 && + errno != ENOTTY) { fprintf(stderr, "ioctl(): %s\n", strerror(errno)); rc = errno; diff --git a/lustre/tests/runas.c b/lustre/tests/runas.c new file mode 100644 index 0000000..3d29f1b --- /dev/null +++ b/lustre/tests/runas.c @@ -0,0 +1,133 @@ +/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- + * vim:expandtab:shiftwidth=8:tabstop=8: + */ +#include +#include +#include +#include +#include +#include +#include + +#define DEBUG 0 + +void +Usage_and_abort() +{ + fprintf(stderr, "Usage: runas -u user_id [ -g grp_id ]" \ + " command_to_be_run \n"); + exit(-1); +} + +// Usage: runas -u user_id [ -g grp_id ] "command_to_be_run" +// return: the return value of "command_to_be_run" +// NOTE: returning -1 might be the return code of this program itself or +// the "command_to_be_run" + +// ROOT runs "runas" for free +// Other users run "runas" requires chmod 6755 "command_to_be_run" + +int +main(int argc, char**argv) +{ + char command[1024]; + char *cmd_ptr; + int status; + int c,i; + int gid_is_set = 0; + int uid_is_set = 0; + uid_t user_id; + gid_t grp_id; + + if(argc == 1) { + Usage_and_abort(); + } + + // get UID and GID + while ((c = getopt (argc, argv, "u:g:h")) != -1) { + switch (c) { + case 'u': + user_id = (uid_t)atoi(optarg); + uid_is_set = 1; + if(!gid_is_set) { + grp_id = user_id; + } + break; + + case 'g': + grp_id = (gid_t)atoi(optarg); + gid_is_set = 1; + break; + + case 'h': + Usage_and_abort (); + break; + + default: + // fprintf(stderr, "Bad parameters.\n"); + // Usage_and_abort (); + } + } + + if (!uid_is_set){ + Usage_and_abort (); + } + + + if(optind == argc) { + fprintf(stderr, "Bad parameters.\n"); + Usage_and_abort(); + } + + + // assemble the command + cmd_ptr = command ; + for (i = optind; i < argc; i++) + cmd_ptr += sprintf(cmd_ptr, "%s ", argv[i]); + + +#if DEBUG + system("whoami"); +#endif + + // set GID + status = setregid(grp_id, grp_id ); + if( status == -1) { + fprintf(stderr, "Cannot change grp_ID to %d, errno=%d (%s)\n", + grp_id, errno, strerror(errno) ); + exit(-1); + } + + // set UID + status = setreuid(user_id, user_id ); + if(status == -1) { + fprintf(stderr,"Cannot change user_ID to %d, errno=%d (%s)\n", + user_id, errno, strerror(errno) ); + exit(-1); + } + +#if DEBUG + system("whoami"); +#endif + + fprintf(stdout, "running as USER(%d), Grp (%d): \"%s\" \n", + user_id, grp_id, command ); + + // run the command + status = system(command); + + // pass the return code of command_to_be_run out of this wrapper + if (status == -1) { + fprintf(stderr, "%s: system() command failed to run\n", + argv[0]); + } + else{ + status = WEXITSTATUS(status); + fprintf(stderr, "[%s #%d] \"%s\" returns %d (%s).\n", argv[0], + user_id, argv[optind], status, strerror(status)); + + } + + return(status); +} + diff --git a/lustre/tests/sanity.sh b/lustre/tests/sanity.sh index 5c4d47d..fdaf82e 100644 --- a/lustre/tests/sanity.sh +++ b/lustre/tests/sanity.sh @@ -146,7 +146,7 @@ $START log '== mkdir .../d7; mcreate .../d7/f2; echo foo > .../d7/f2 = test 7b' $MCREATE $DIR/d7/f2 -log -n foo > $DIR/d7/f2 +echo -n foo > $DIR/d7/f2 [ "`cat $DIR/d7/f2`" = "foo" ] || error $CHECKSTAT -t file -s 3 $DIR/d7/f2 || error pass @@ -312,6 +312,7 @@ pass $CLEAN $START + log '== O_CREAT|O_EXCL in subdir ====================== test 23' mkdir $DIR/d23 $TOEXCL $DIR/d23/f23 diff --git a/lustre/tests/sanityN.sh b/lustre/tests/sanityN.sh index 8e95654..a4930de 100644 --- a/lustre/tests/sanityN.sh +++ b/lustre/tests/sanityN.sh @@ -106,8 +106,35 @@ for C in a b c d e f g h i j k l; do done [ "`cat $MOUNT1/f11`" = "abcdefghijkl" ] && pass || error -rm -f $MOUNT1/f[0-9]* $MOUNT1/lnk +echo "test 12: file length and contents across mounts" +dd if=$SHELL of=$MOUNT1/f12 bs=4096 count=1 +$CHECKSTAT -s 4096 $MOUNT1/f12 $MOUNT2/f12 || error +dd if=$SHELL bs=4096 count=1 | \ + md5sum - $MOUNT1/f12 $MOUNT2/f12 | ( \ + read GOODSUM DASH; \ + while read SUM FILE ; do \ + [ $SUM == $GOODSUM ] || exit 2; \ + done; ) || error + +echo "test 13: open(,O_TRUNC,), close() across mounts" +dd if=$SHELL of=$MOUNT1/f13 bs=4096 count=1 +> $MOUNT1/f13 +$CHECKSTAT -s 0 $MOUNT1/f13 $MOUNT2/f13 || error + +echo "test 14: file extension while holding the fd open" +> $MOUNT1/f14 +# ugh. +touch $MOUNT1/f14-start +sh -c " + echo -n a; + mv $MOUNT1/f14-start $MOUNT1/f14-going; + while [ -f $MOUNT1/f14-going ] ; do sleep 1; done; + " >> $MOUNT1/f14 & +while [ -f $MOUNT1/f14-start ] ; do sleep 1; done; +$CHECKSTAT -s 1 $MOUNT1/f14 $MOUNT2/f14 || error +rm $MOUNT1/f14-going +rm -f $MOUNT1/f[0-9]* $MOUNT1/lnk $CLEAN exit diff --git a/lustre/tests/test_brw.c b/lustre/tests/test_brw.c index 196f32c..396f3b0 100644 --- a/lustre/tests/test_brw.c +++ b/lustre/tests/test_brw.c @@ -5,6 +5,8 @@ #include #include #include +#include +#include // not correctly in the headers yet!! //#define O_DIRECT 0 @@ -12,7 +14,6 @@ #define O_DIRECT 040000 /* direct disk access hint */ #endif -#define BLOCKSIZE 4096 #define CERROR(fmt, arg...) fprintf(stderr, fmt, ## arg) #ifndef __u64 #define __u64 long long @@ -91,6 +92,7 @@ int main(int argc, char **argv) long long count, last, offset; long pg_vec, len; long long objid = 3; + struct stat st; int flags = 0; int cmd = 0; char *end; @@ -131,8 +133,6 @@ int main(int argc, char **argv) usage(argv[0]); } } - len = pg_vec * BLOCKSIZE; - last = (long long)count * len; if (argc >= 6) { objid = strtoull(argv[5], &end, 0); @@ -147,13 +147,6 @@ int main(int argc, char **argv) argv[0], flags & O_DIRECT ? "directio" : "i/o", argv[1], objid, count, pg_vec); - buf = mmap(0, len, PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_ANON, 0, 0); - if (!buf) { - fprintf(stderr, "%s: no buffer memory %s\n", - argv[0], strerror(errno)); - return 2; - } - fd = open(argv[1], flags | O_LARGEFILE); if (fd == -1) { fprintf(stderr, "%s: cannot open %s: %s\n", argv[0], @@ -161,23 +154,41 @@ int main(int argc, char **argv) return 3; } + rc = fstat(fd, &st); + if (rc < 0) { + fprintf(stderr, "%s: cannot stat %s: %s\n", argv[0], + argv[1], strerror(errno)); + return 4; + } + + len = pg_vec * st.st_blksize; + last = (long long)count * len; + + buf = mmap(0, len, PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_ANON, 0, 0); + if (!buf) { + fprintf(stderr, "%s: no buffer memory %s\n", + argv[0], strerror(errno)); + return 2; + } + for (offset = 0; offset < last && cmd & WRITE; offset += len) { int i; - for (i = 0; i < len; i += BLOCKSIZE) - page_debug_setup(buf + i, BLOCKSIZE, offset + i, objid); + for (i = 0; i < len; i += st.st_blksize) + page_debug_setup(buf + i, st.st_blksize, offset + i, + objid); rc = write(fd, buf, len); - for (i = 0; i < len; i += BLOCKSIZE) { - if (page_debug_check("write", buf + i, BLOCKSIZE, + for (i = 0; i < len; i += st.st_blksize) { + if (page_debug_check("write", buf + i, st.st_blksize, offset + i, objid)) return 10; } if (rc != len) { - fprintf(stderr, "%s: write error: %s, rc %d\n", - argv[0], strerror(errno), rc); + fprintf(stderr, "%s: write error: %s, rc %d != %ld\n", + argv[0], strerror(errno), rc, len); return 4; } } @@ -193,13 +204,13 @@ int main(int argc, char **argv) rc = read(fd, buf, len); if (rc != len) { - fprintf(stderr, "%s: read error: %s, rc %d\n", - argv[0], strerror(errno), rc); + fprintf(stderr, "%s: read error: %s, rc %d != %ld\n", + argv[0], strerror(errno), rc, len); return 6; } - for (i = 0; i < len; i += BLOCKSIZE) { - if (page_debug_check("read", buf + i, BLOCKSIZE, + for (i = 0; i < len; i += st.st_blksize) { + if (page_debug_check("read", buf + i, st.st_blksize, offset + i, objid)) return 11; } diff --git a/lustre/utils/lctl.c b/lustre/utils/lctl.c index 1efbd8c..a143647 100644 --- a/lustre/utils/lctl.c +++ b/lustre/utils/lctl.c @@ -76,7 +76,7 @@ command_t cmdlist[] = { {"add_uuid", jt_obd_add_uuid, 0, "associate a UUID with a nid\n" "usage: add_uuid "}, {"close_uuid", jt_obd_close_uuid, 0, "disconnect a UUID\n" - "usage: close_uuid )"}, + "usage: close_uuid )"}, {"del_uuid", jt_obd_del_uuid, 0, "delete a UUID association\n" "usage: del_uuid "}, {"add_route", jt_ptl_add_route, 0, diff --git a/lustre/utils/obd.c b/lustre/utils/obd.c index c4ecc42..95e5445 100644 --- a/lustre/utils/obd.c +++ b/lustre/utils/obd.c @@ -1958,7 +1958,7 @@ int jt_obd_close_uuid(int argc, char **argv) struct obd_ioctl_data data; if (argc != 3) { - fprintf(stderr, "usage: %s \n", argv[0]); + fprintf(stderr, "usage: %s \n", argv[0]); return 0; } diff --git a/lustre/utils/obdbarrier.c b/lustre/utils/obdbarrier.c index 3363824..4373071 100644 --- a/lustre/utils/obdbarrier.c +++ b/lustre/utils/obdbarrier.c @@ -70,13 +70,13 @@ parse_kmg (uint64_t *valp, char *str) } void -usage (char *cmdname, int help) +usage (char *cmdname, int help) { char *name = strrchr (cmdname, '/'); - + if (name == NULL) name = cmdname; - + fprintf (help ? stdout : stderr, "usage: %s -d device -s size -o offset [-i id][-n reps][-l] oid\n", name); @@ -85,32 +85,32 @@ usage (char *cmdname, int help) int exponential_modulus (int i, int base) { - int top = base; - int mod = 1; - - for (;;) { - if (i < top) - return (i%mod == 0); - - mod = top; - top *= base; - } + int top = base; + int mod = 1; + + for (;;) { + if (i < top) + return (i%mod == 0); + + mod = top; + top *= base; + } } int -main (int argc, char **argv) +main (int argc, char **argv) { uint64_t bid = (((uint64_t)gethostid()) << 32) | getpid (); int set_bid = 0; uint64_t oid; - int setup = 0; + int setup = 0; int device = -1; - int npeers = 0; + int npeers = 0; int reps = 1; char hostname[128]; struct obdio_conn *conn; - struct obdio_barrier *b; - char *end; + struct obdio_barrier *b; + char *end; uint64_t val; int rc; int c; @@ -119,13 +119,13 @@ main (int argc, char **argv) memset (hostname, 0, sizeof (hostname)); gethostname (hostname, sizeof (hostname)); hostname[sizeof(hostname) - 1] = 0; - + while ((c = getopt (argc, argv, "hsi:d:n:p:")) != -1) switch (c) { case 'h': usage (argv[0], 1); return (0); - + case 'i': bid = strtoll (optarg, &end, 0); if (end == optarg || *end != 0) { @@ -135,11 +135,11 @@ main (int argc, char **argv) } set_bid = 1; break; - + case 's': - setup = 1; + setup = 1; break; - + case 'd': device = strtol (optarg, &end, 0); if (end == optarg || *end != 0 || device < 0) { @@ -160,7 +160,7 @@ main (int argc, char **argv) case 'p': npeers = strtol (optarg, &end, 0); - if (end == optarg || *end != 0 || npeers <= 0) { + if (end == optarg || *end != 0 || npeers <= 0) { fprintf (stderr, "Can't parse npeers %s\n", optarg); return (1); @@ -174,7 +174,7 @@ main (int argc, char **argv) if ((!setup && !set_bid) || npeers <= 0 || - device < 0 || + device < 0 || optind == argc) { fprintf (stderr, "%s not specified\n", (!setup && !set_bid) ? "id" : @@ -182,40 +182,40 @@ main (int argc, char **argv) device < 0 ? "device" : "object id"); return (1); } - + oid = strtoull (argv[optind], &end, 0); if (end == argv[optind] || *end != 0) { fprintf (stderr, "Can't parse object id %s\n", argv[optind]); return (1); } - + conn = obdio_connect (device); if (conn == NULL) return (1); - b = obdio_new_barrier (oid, bid, npeers); - if (b == NULL) - return (1); + b = obdio_new_barrier (oid, bid, npeers); + if (b == NULL) + return (1); rc = 0; - if (setup) { - rc = obdio_setup_barrier (conn, b); + if (setup) { + rc = obdio_setup_barrier (conn, b); if (rc == 0) printf ("Setup barrier: -d %d -i "LPX64" -p %d -n1 "LPX64"\n", device, bid, npeers, oid); - } else { - for (c = 0; c < reps; c++) { - rc = obdio_barrier (conn, b); - if (rc != 0) - break; - if (exponential_modulus (c, 10)) - printf ("%s: Barrier %d\n", hostname, c); - } - } - - free (b); - + } else { + for (c = 0; c < reps; c++) { + rc = obdio_barrier (conn, b); + if (rc != 0) + break; + if (exponential_modulus (c, 10)) + printf ("%s: Barrier %d\n", hostname, c); + } + } + + free (b); + obdio_disconnect (conn); return (rc == 0 ? 0 : 1); diff --git a/lustre/utils/obdio.c b/lustre/utils/obdio.c index 65a4cac..8264761 100644 --- a/lustre/utils/obdio.c +++ b/lustre/utils/obdio.c @@ -30,9 +30,9 @@ #include "obdiolib.h" int -obdio_test_fixed_extent (struct obdio_conn *conn, - uint32_t myhid, uint32_t mypid, - int reps, int locked, uint64_t oid, +obdio_test_fixed_extent (struct obdio_conn *conn, + uint32_t myhid, uint32_t mypid, + int reps, int locked, uint64_t oid, uint64_t offset, uint32_t size) { struct lustre_handle fh; @@ -44,7 +44,7 @@ obdio_test_fixed_extent (struct obdio_conn *conn, int j; int rc; int rc2; - + rc = obdio_open (conn, oid, &fh); if (rc != 0) { fprintf (stderr, "Failed to open object "LPX64": %s\n", @@ -58,7 +58,7 @@ obdio_test_fixed_extent (struct obdio_conn *conn, rc = -1; goto out_0; } - + for (i = 0; i < reps; i++) { ibuf = (uint32_t *) buffer; for (j = 0; j < size / (4 * sizeof (*ibuf)); j++) { @@ -77,7 +77,7 @@ obdio_test_fixed_extent (struct obdio_conn *conn, goto out_1; } } - + rc = obdio_pwrite (conn, oid, buffer, size, offset); if (rc != 0) { fprintf (stderr, "Error writing "LPX64" @ "LPU64" for %u: %s\n", @@ -87,9 +87,9 @@ obdio_test_fixed_extent (struct obdio_conn *conn, rc = -1; goto out_1; } - + memset (buffer, 0xbb, size); - + rc = obdio_pread (conn, oid, buffer, size, offset); if (rc != 0) { fprintf (stderr, "Error reading "LPX64" @ "LPU64" for %u: %s\n", @@ -109,7 +109,7 @@ obdio_test_fixed_extent (struct obdio_conn *conn, goto out_1; } } - + ibuf = (uint32_t *) buffer; for (j = 0; j < size / (4 * sizeof (*ibuf)); j++) { if (ibuf[0] != myhid || @@ -177,20 +177,20 @@ parse_kmg (uint64_t *valp, char *str) } void -usage (char *cmdname, int help) +usage (char *cmdname, int help) { char *name = strrchr (cmdname, '/'); - + if (name == NULL) name = cmdname; - + fprintf (help ? stdout : stderr, "usage: %s -d device -s size -o offset [-i id][-n reps][-l] oid\n", name); } int -main (int argc, char **argv) +main (int argc, char **argv) { uint32_t mypid = getpid (); uint32_t myhid = gethostid (); @@ -214,7 +214,7 @@ main (int argc, char **argv) case 'h': usage (argv[0], 1); return (0); - + case 'i': switch (sscanf (optarg, "%i.%i", &v1, &v2)) { case 1: @@ -230,7 +230,7 @@ main (int argc, char **argv) return (1); } break; - + case 's': if (parse_kmg (&val, optarg) != 0) { fprintf (stderr, "Can't parse size %s\n", @@ -240,7 +240,7 @@ main (int argc, char **argv) size = (uint32_t)val; set_size++; break; - + case 'o': if (parse_kmg (&val, optarg) != 0) { fprintf (stderr, "Can't parse offset %s\n", @@ -282,21 +282,21 @@ main (int argc, char **argv) device < 0 ? "device" : "object id"); return (1); } - + oid = strtoull (argv[optind], &end, 0); if (end == argv[optind] || *end != 0) { fprintf (stderr, "Can't parse object id %s\n", argv[optind]); return (1); } - + conn = obdio_connect (device); if (conn == NULL) return (1); - - rc = obdio_test_fixed_extent (conn, myhid, mypid, reps, locked, + + rc = obdio_test_fixed_extent (conn, myhid, mypid, reps, locked, oid, base_offset, size); - + obdio_disconnect (conn); return (rc == 0 ? 0 : 1); diff --git a/lustre/utils/obdiolib.c b/lustre/utils/obdiolib.c index 0404808..8c79c67 100644 --- a/lustre/utils/obdiolib.c +++ b/lustre/utils/obdiolib.c @@ -44,30 +44,30 @@ obdio_iocinit (struct obdio_conn *conn) } int -obdio_ioctl (struct obdio_conn *conn, int cmd) +obdio_ioctl (struct obdio_conn *conn, int cmd) { char *buf = conn->oc_buffer; int rc; int rc2; - + rc = obd_ioctl_pack (&conn->oc_data, &buf, sizeof (conn->oc_buffer)); if (rc != 0) { - fprintf (stderr, "obdio_ioctl: obd_ioctl_pack: %d (%s)\n", + fprintf (stderr, "obdio_ioctl: obd_ioctl_pack: %d (%s)\n", rc, strerror (errno)); abort (); } - + rc = ioctl (conn->oc_fd, cmd, buf); if (rc != 0) return (rc); - + rc2 = obd_ioctl_unpack (&conn->oc_data, buf, sizeof (conn->oc_buffer)); if (rc2 != 0) { fprintf (stderr, "obdio_ioctl: obd_ioctl_unpack: %d (%s)\n", rc2, strerror (errno)); abort (); } - + return (rc); } @@ -83,9 +83,9 @@ obdio_connect (int device) return (NULL); } memset (conn, 0, sizeof (*conn)); - - conn->oc_fd = open ("/dev/obd", O_RDWR); - if (conn->oc_fd < 0) { + + conn->oc_fd = open ("/dev/obd", O_RDWR); + if (conn->oc_fd < 0) { fprintf (stderr, "obdio_connect: Can't open /dev/obd: %s\n", strerror (errno)); goto failed; @@ -99,7 +99,7 @@ obdio_connect (int device) device, strerror (errno)); goto failed; } - + obdio_iocinit (conn); rc = obdio_ioctl (conn, OBD_IOC_CONNECT); if (rc != 0) { @@ -107,18 +107,18 @@ obdio_connect (int device) device, strerror (errno)); goto failed; } - + conn->oc_conn_addr = conn->oc_data.ioc_addr; conn->oc_conn_cookie = conn->oc_data.ioc_cookie; return (conn); - + failed: free (conn); return (NULL); } void -obdio_disconnect (struct obdio_conn *conn) +obdio_disconnect (struct obdio_conn *conn) { close (conn->oc_fd); /* obdclass will automatically close on last ref */ @@ -126,18 +126,18 @@ obdio_disconnect (struct obdio_conn *conn) } int -obdio_open (struct obdio_conn *conn, uint64_t oid, struct lustre_handle *fh) +obdio_open (struct obdio_conn *conn, uint64_t oid, struct lustre_handle *fh) { int rc; - + obdio_iocinit (conn); - + conn->oc_data.ioc_obdo1.o_id = oid; conn->oc_data.ioc_obdo1.o_mode = S_IFREG; conn->oc_data.ioc_obdo1.o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLMODE; - + rc = obdio_ioctl (conn, OBD_IOC_OPEN); - + if (rc == 0) memcpy (fh, obdo_handle(&conn->oc_data.ioc_obdo1), sizeof (*fh)); @@ -145,26 +145,26 @@ obdio_open (struct obdio_conn *conn, uint64_t oid, struct lustre_handle *fh) } int -obdio_close (struct obdio_conn *conn, uint64_t oid, struct lustre_handle *fh) +obdio_close (struct obdio_conn *conn, uint64_t oid, struct lustre_handle *fh) { obdio_iocinit (conn); - + conn->oc_data.ioc_obdo1.o_id = oid; conn->oc_data.ioc_obdo1.o_mode = S_IFREG; memcpy (obdo_handle (&conn->oc_data.ioc_obdo1), fh, sizeof (*fh)); - conn->oc_data.ioc_obdo1.o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | + conn->oc_data.ioc_obdo1.o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLMODE | OBD_MD_FLHANDLE; - + return (obdio_ioctl (conn, OBD_IOC_CLOSE)); } int -obdio_pread (struct obdio_conn *conn, uint64_t oid, - char *buffer, uint32_t count, uint64_t offset) +obdio_pread (struct obdio_conn *conn, uint64_t oid, + char *buffer, uint32_t count, uint64_t offset) { obdio_iocinit (conn); - + conn->oc_data.ioc_obdo1.o_id = oid; conn->oc_data.ioc_obdo1.o_mode = S_IFREG; conn->oc_data.ioc_obdo1.o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLMODE; @@ -178,11 +178,11 @@ obdio_pread (struct obdio_conn *conn, uint64_t oid, } int -obdio_pwrite (struct obdio_conn *conn, uint64_t oid, - char *buffer, uint32_t count, uint64_t offset) +obdio_pwrite (struct obdio_conn *conn, uint64_t oid, + char *buffer, uint32_t count, uint64_t offset) { obdio_iocinit (conn); - + conn->oc_data.ioc_obdo1.o_id = oid; conn->oc_data.ioc_obdo1.o_mode = S_IFREG; conn->oc_data.ioc_obdo1.o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLMODE; @@ -201,9 +201,9 @@ obdio_enqueue (struct obdio_conn *conn, uint64_t oid, struct lustre_handle *lh) { int rc; - + obdio_iocinit (conn); - + conn->oc_data.ioc_obdo1.o_id = oid; conn->oc_data.ioc_obdo1.o_mode = S_IFREG; conn->oc_data.ioc_obdo1.o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLMODE; @@ -211,12 +211,12 @@ obdio_enqueue (struct obdio_conn *conn, uint64_t oid, conn->oc_data.ioc_conn1 = mode; conn->oc_data.ioc_count = count; conn->oc_data.ioc_offset = offset; - + rc = obdio_ioctl (conn, ECHO_IOC_ENQUEUE); - + if (rc == 0) memcpy (lh, obdo_handle (&conn->oc_data.ioc_obdo1), sizeof (*lh)); - + return (rc); } @@ -227,40 +227,40 @@ obdio_cancel (struct obdio_conn *conn, struct lustre_handle *lh) memcpy (obdo_handle (&conn->oc_data.ioc_obdo1), lh, sizeof (*lh)); conn->oc_data.ioc_obdo1.o_valid = OBD_MD_FLHANDLE; - + return (obdio_ioctl (conn, ECHO_IOC_CANCEL)); } void * -obdio_alloc_aligned_buffer (void **spacep, int size) +obdio_alloc_aligned_buffer (void **spacep, int size) { int pagesize = getpagesize(); void *space = malloc (size + pagesize - 1); - + *spacep = space; if (space == NULL) return (NULL); - + return ((void *)(((unsigned long)space + pagesize - 1) & ~(pagesize - 1))); } struct obdio_barrier * -obdio_new_barrier (uint64_t oid, uint64_t id, int npeers) +obdio_new_barrier (uint64_t oid, uint64_t id, int npeers) { - struct obdio_barrier *b; - - b = (struct obdio_barrier *)malloc (sizeof (*b)); - if (b == NULL) { - fprintf (stderr, "obdio_new_barrier "LPX64": Can't allocate\n", oid); - return (NULL); - } - - b->ob_id = id; - b->ob_oid = oid; - b->ob_npeers = npeers; - b->ob_ordinal = 0; - b->ob_count = 0; - return (b); + struct obdio_barrier *b; + + b = (struct obdio_barrier *)malloc (sizeof (*b)); + if (b == NULL) { + fprintf (stderr, "obdio_new_barrier "LPX64": Can't allocate\n", oid); + return (NULL); + } + + b->ob_id = id; + b->ob_oid = oid; + b->ob_npeers = npeers; + b->ob_ordinal = 0; + b->ob_count = 0; + return (b); } int @@ -273,86 +273,86 @@ obdio_setup_barrier (struct obdio_conn *conn, struct obdio_barrier *b) void *space; struct obdio_barrier *fileb; - if (b->ob_ordinal != 0 || - b->ob_count != 0) { - fprintf (stderr, "obdio_setup_barrier: invalid parameter\n"); - abort (); - } - + if (b->ob_ordinal != 0 || + b->ob_count != 0) { + fprintf (stderr, "obdio_setup_barrier: invalid parameter\n"); + abort (); + } + rc = obdio_open (conn, b->ob_oid, &fh); if (rc != 0) { fprintf (stderr, "obdio_setup_barrier "LPX64": Failed to open object: %s\n", b->ob_oid, strerror (errno)); return (rc); } - + fileb = (struct obdio_barrier *) obdio_alloc_aligned_buffer (&space, getpagesize ()); if (fileb == NULL) { fprintf (stderr, "obdio_setup_barrier "LPX64": Can't allocate page buffer\n", - b->ob_oid); + b->ob_oid); rc = -1; goto out_0; } - + memset (fileb, 0, getpagesize ()); - *fileb = *b; - + *fileb = *b; + rc = obdio_enqueue (conn, b->ob_oid, LCK_PW, 0, getpagesize (), &lh); if (rc != 0) { fprintf (stderr, "obdio_setup_barrier "LPX64": Error on enqueue: %s\n", b->ob_oid, strerror (errno)); goto out_1; } - + rc = obdio_pwrite (conn, b->ob_oid, (void *)fileb, getpagesize (), 0); - if (rc != 0) - fprintf (stderr, "obdio_setup_barrier "LPX64": Error on write: %s\n", - b->ob_oid, strerror (errno)); - - rc2 = obdio_cancel (conn, &lh); - if (rc == 0 && rc2 != 0) { - fprintf (stderr, "obdio_setup_barrier "LPX64": Error on cancel: %s\n", - b->ob_oid, strerror (errno)); - rc = rc2; - } + if (rc != 0) + fprintf (stderr, "obdio_setup_barrier "LPX64": Error on write: %s\n", + b->ob_oid, strerror (errno)); + + rc2 = obdio_cancel (conn, &lh); + if (rc == 0 && rc2 != 0) { + fprintf (stderr, "obdio_setup_barrier "LPX64": Error on cancel: %s\n", + b->ob_oid, strerror (errno)); + rc = rc2; + } out_1: - free (space); + free (space); out_0: - rc2 = obdio_close (conn, b->ob_oid, &fh); - if (rc == 0 && rc2 != 0) { - fprintf (stderr, "obdio_setup_barrier "LPX64": Error on close: %s\n", - b->ob_oid, strerror (errno)); - rc = rc2; - } - - return (rc); + rc2 = obdio_close (conn, b->ob_oid, &fh); + if (rc == 0 && rc2 != 0) { + fprintf (stderr, "obdio_setup_barrier "LPX64": Error on close: %s\n", + b->ob_oid, strerror (errno)); + rc = rc2; + } + + return (rc); } int obdio_barrier (struct obdio_conn *conn, struct obdio_barrier *b) { struct lustre_handle fh; - struct lustre_handle lh; - int rc; - int rc2; + struct lustre_handle lh; + int rc; + int rc2; void *space; struct obdio_barrier *fileb; - char *mode; - - rc = obdio_open (conn, b->ob_oid, &fh); - if (rc != 0) { - fprintf (stderr, "obdio_barrier "LPX64": Error on open: %s\n", - b->ob_oid, strerror (errno)); - return (rc); - } - + char *mode; + + rc = obdio_open (conn, b->ob_oid, &fh); + if (rc != 0) { + fprintf (stderr, "obdio_barrier "LPX64": Error on open: %s\n", + b->ob_oid, strerror (errno)); + return (rc); + } + fileb = (struct obdio_barrier *) obdio_alloc_aligned_buffer (&space, getpagesize ()); - if (fileb == NULL) { - fprintf (stderr, "obdio_barrier "LPX64": Can't allocate page buffer\n", - b->ob_oid); - rc = -1; - goto out_0; - } + if (fileb == NULL) { + fprintf (stderr, "obdio_barrier "LPX64": Can't allocate page buffer\n", + b->ob_oid); + rc = -1; + goto out_0; + } rc = obdio_enqueue (conn, b->ob_oid, LCK_PW, 0, getpagesize (), &lh); if (rc != 0) { @@ -360,107 +360,107 @@ obdio_barrier (struct obdio_conn *conn, struct obdio_barrier *b) b->ob_oid, strerror (errno)); goto out_1; } - - memset (fileb, 0xeb, getpagesize ()); - rc = obdio_pread (conn, b->ob_oid, (void *)fileb, getpagesize (), 0); - if (rc != 0) { - fprintf (stderr, "obdio_barrier "LPX64": Error on initial read: %s\n", - b->ob_oid, strerror (errno)); - goto out_2; - } - - if (fileb->ob_id != b->ob_id || - fileb->ob_oid != b->ob_oid || - fileb->ob_npeers != b->ob_npeers || - fileb->ob_count >= b->ob_npeers || - fileb->ob_ordinal != b->ob_ordinal) { - fprintf (stderr, "obdio_barrier "LPX64": corrupt on initial read\n", b->ob_id); - fprintf (stderr, " got ["LPX64","LPX64","LPX64","LPX64","LPX64"]\n", - fileb->ob_id, fileb->ob_oid, fileb->ob_npeers, - fileb->ob_ordinal, fileb->ob_count); - fprintf (stderr, " expected ["LPX64","LPX64","LPX64","LPX64","LPX64"]\n", - b->ob_id, b->ob_oid, b->ob_npeers, - b->ob_ordinal, b->ob_count); - rc = -1; - goto out_2; - } - - fileb->ob_count++; - if (fileb->ob_count == fileb->ob_npeers) { /* I'm the last joiner */ - fileb->ob_count = 0; /* join count for next barrier */ - fileb->ob_ordinal++; /* signal all joined */ - } - - rc = obdio_pwrite (conn, b->ob_oid, (void *)fileb, getpagesize (), 0); - if (rc != 0) { - fprintf (stderr, "obdio_barrier "LPX64": Error on initial write: %s\n", - b->ob_oid, strerror (errno)); - goto out_2; - } - - mode = "PW"; - b->ob_ordinal++; /* now I wait... */ - while (fileb->ob_ordinal != b->ob_ordinal) { - - rc = obdio_cancel (conn, &lh); - if (rc != 0) { - fprintf (stderr, "obdio_barrier "LPX64": Error on %s cancel: %s\n", - b->ob_oid, mode, strerror (errno)); - goto out_1; - } - - mode = "PR"; - rc = obdio_enqueue (conn, b->ob_oid, LCK_PR, 0, getpagesize (), &lh); - if (rc != 0) { - fprintf (stderr, "obdio_barrier "LPX64": Error on PR enqueue: %s\n", - b->ob_oid, strerror (errno)); - goto out_1; - } - - memset (fileb, 0xeb, getpagesize ()); - rc = obdio_pread (conn, b->ob_oid, (void *)fileb, getpagesize (), 0); - if (rc != 0) { - fprintf (stderr, "obdio_barrier "LPX64": Error on read: %s\n", - b->ob_oid, strerror (errno)); - goto out_2; - } - - if (fileb->ob_id != b->ob_id || - fileb->ob_oid != b->ob_oid || - fileb->ob_npeers != b->ob_npeers || - fileb->ob_count >= b->ob_npeers || - (fileb->ob_ordinal != b->ob_ordinal - 1 && - fileb->ob_ordinal != b->ob_ordinal)) { - fprintf (stderr, "obdio_barrier "LPX64": corrupt\n", b->ob_id); - fprintf (stderr, " got ["LPX64","LPX64","LPX64","LPX64","LPX64"]\n", - fileb->ob_id, fileb->ob_oid, fileb->ob_npeers, - fileb->ob_ordinal, fileb->ob_count); - fprintf (stderr, " expected ["LPX64","LPX64","LPX64","LPX64","LPX64"]\n", - b->ob_id, b->ob_oid, b->ob_npeers, - b->ob_ordinal, b->ob_count); - rc = -1; - goto out_2; - } - } - + + memset (fileb, 0xeb, getpagesize ()); + rc = obdio_pread (conn, b->ob_oid, (void *)fileb, getpagesize (), 0); + if (rc != 0) { + fprintf (stderr, "obdio_barrier "LPX64": Error on initial read: %s\n", + b->ob_oid, strerror (errno)); + goto out_2; + } + + if (fileb->ob_id != b->ob_id || + fileb->ob_oid != b->ob_oid || + fileb->ob_npeers != b->ob_npeers || + fileb->ob_count >= b->ob_npeers || + fileb->ob_ordinal != b->ob_ordinal) { + fprintf (stderr, "obdio_barrier "LPX64": corrupt on initial read\n", b->ob_id); + fprintf (stderr, " got ["LPX64","LPX64","LPX64","LPX64","LPX64"]\n", + fileb->ob_id, fileb->ob_oid, fileb->ob_npeers, + fileb->ob_ordinal, fileb->ob_count); + fprintf (stderr, " expected ["LPX64","LPX64","LPX64","LPX64","LPX64"]\n", + b->ob_id, b->ob_oid, b->ob_npeers, + b->ob_ordinal, b->ob_count); + rc = -1; + goto out_2; + } + + fileb->ob_count++; + if (fileb->ob_count == fileb->ob_npeers) { /* I'm the last joiner */ + fileb->ob_count = 0; /* join count for next barrier */ + fileb->ob_ordinal++; /* signal all joined */ + } + + rc = obdio_pwrite (conn, b->ob_oid, (void *)fileb, getpagesize (), 0); + if (rc != 0) { + fprintf (stderr, "obdio_barrier "LPX64": Error on initial write: %s\n", + b->ob_oid, strerror (errno)); + goto out_2; + } + + mode = "PW"; + b->ob_ordinal++; /* now I wait... */ + while (fileb->ob_ordinal != b->ob_ordinal) { + + rc = obdio_cancel (conn, &lh); + if (rc != 0) { + fprintf (stderr, "obdio_barrier "LPX64": Error on %s cancel: %s\n", + b->ob_oid, mode, strerror (errno)); + goto out_1; + } + + mode = "PR"; + rc = obdio_enqueue (conn, b->ob_oid, LCK_PR, 0, getpagesize (), &lh); + if (rc != 0) { + fprintf (stderr, "obdio_barrier "LPX64": Error on PR enqueue: %s\n", + b->ob_oid, strerror (errno)); + goto out_1; + } + + memset (fileb, 0xeb, getpagesize ()); + rc = obdio_pread (conn, b->ob_oid, (void *)fileb, getpagesize (), 0); + if (rc != 0) { + fprintf (stderr, "obdio_barrier "LPX64": Error on read: %s\n", + b->ob_oid, strerror (errno)); + goto out_2; + } + + if (fileb->ob_id != b->ob_id || + fileb->ob_oid != b->ob_oid || + fileb->ob_npeers != b->ob_npeers || + fileb->ob_count >= b->ob_npeers || + (fileb->ob_ordinal != b->ob_ordinal - 1 && + fileb->ob_ordinal != b->ob_ordinal)) { + fprintf (stderr, "obdio_barrier "LPX64": corrupt\n", b->ob_id); + fprintf (stderr, " got ["LPX64","LPX64","LPX64","LPX64","LPX64"]\n", + fileb->ob_id, fileb->ob_oid, fileb->ob_npeers, + fileb->ob_ordinal, fileb->ob_count); + fprintf (stderr, " expected ["LPX64","LPX64","LPX64","LPX64","LPX64"]\n", + b->ob_id, b->ob_oid, b->ob_npeers, + b->ob_ordinal, b->ob_count); + rc = -1; + goto out_2; + } + } + out_2: - rc2 = obdio_cancel (conn, &lh); - if (rc == 0 && rc2 != 0) { - fprintf (stderr, "obdio_barrier "LPX64": Error on cancel: %s\n", - b->ob_oid, strerror (errno)); - rc = rc2; - } + rc2 = obdio_cancel (conn, &lh); + if (rc == 0 && rc2 != 0) { + fprintf (stderr, "obdio_barrier "LPX64": Error on cancel: %s\n", + b->ob_oid, strerror (errno)); + rc = rc2; + } out_1: - free (space); + free (space); out_0: - rc2 = obdio_close (conn, b->ob_oid, &fh); - if (rc == 0 && rc2 != 0) { - fprintf (stderr, "obdio_barrier "LPX64": Error on close: %s\n", - b->ob_oid, strerror (errno)); - rc = rc2; - } - - return (rc); + rc2 = obdio_close (conn, b->ob_oid, &fh); + if (rc == 0 && rc2 != 0) { + fprintf (stderr, "obdio_barrier "LPX64": Error on close: %s\n", + b->ob_oid, strerror (errno)); + rc = rc2; + } + + return (rc); } - + -- 1.8.3.1