Whamcloud - gitweb
merge b_devel into HEAD. Includes:
authorpschwan <pschwan>
Tue, 11 Mar 2003 23:37:27 +0000 (23:37 +0000)
committerpschwan <pschwan>
Tue, 11 Mar 2003 23:37:27 +0000 (23:37 +0000)
- client-side I/O cache
- O_DIRECT fixes for IA64
- liblustre improvements
- various small bug fixes

66 files changed:
lustre/ChangeLog
lustre/Makefile.am
lustre/conf/slapd-lustre.conf
lustre/configure.in
lustre/extN/ext3-2.5-noread.diff [new file with mode: 0644]
lustre/extN/extN-noread.diff
lustre/include/liblustre.h
lustre/include/linux/lprocfs_status.h
lustre/include/linux/lustre_dlm.h
lustre/include/linux/lustre_idl.h
lustre/include/linux/lustre_lib.h
lustre/include/linux/lustre_lite.h
lustre/include/linux/lustre_mds.h
lustre/include/linux/lustre_net.h
lustre/include/linux/obd_class.h
lustre/include/linux/obd_ost.h
lustre/include/linux/obd_ptlbd.h
lustre/include/linux/obd_support.h
lustre/kernel_patches/patches/iod-stock-24-exports_hp.patch [new file with mode: 0644]
lustre/kernel_patches/series/hp-pnnl
lustre/kernel_patches/series/rh-8.0 [deleted file]
lustre/ldlm/ldlm_lockd.c
lustre/lib/obd_pack.c
lustre/lib/target.c
lustre/liblustre/Makefile.am
lustre/liblustre/libtest.c
lustre/llite/Makefile.am
lustre/llite/commit_callback.c
lustre/llite/file.c
lustre/llite/iod.c [new file with mode: 0644]
lustre/llite/rw.c
lustre/llite/super.c
lustre/llite/super25.c
lustre/lov/lov_obd.c
lustre/mds/mds_fs.c
lustre/obdclass/class_obd.c
lustre/obdclass/genops.c
lustre/obdecho/echo_client.c
lustre/obdfilter/Makefile.am
lustre/obdfilter/filter.c
lustre/osc/osc_request.c
lustre/ost/ost_handler.c
lustre/ptlbd/blk.c
lustre/ptlbd/client.c
lustre/ptlbd/main.c
lustre/ptlbd/rpc.c
lustre/ptlbd/server.c
lustre/ptlrpc/client.c
lustre/ptlrpc/niobuf.c
lustre/ptlrpc/rpc.c
lustre/ptlrpc/service.c
lustre/scripts/lustre.spec.in
lustre/tests/ba-echo.sh
lustre/tests/directio.c
lustre/tests/lkcdmap
lustre/tests/llechocleanup.sh
lustre/tests/openclose.c
lustre/tests/runas.c [new file with mode: 0644]
lustre/tests/sanity.sh
lustre/tests/sanityN.sh
lustre/tests/test_brw.c
lustre/utils/lctl.c
lustre/utils/obd.c
lustre/utils/obdbarrier.c
lustre/utils/obdio.c
lustre/utils/obdiolib.c

index 61193c7..97789a8 100644 (file)
@@ -1,5 +1,5 @@
-TBD
-       * version v0_5_21
+2003-03-11  Phil Schwan  <phil@clusterfs.com>
+       * version v0_6
        * bug fixes
        - LDLM_DEBUG macro fix, for gcc 3.2 (850)
        - failed open()s could cause deadlock; fixed (867, 869)
@@ -17,6 +17,11 @@ TBD
        - if a bad lock AST arrives, send an error instead of dropping entirely
        - return 0 from revalidate2 if ll_intent_lock returns -EINTR (912)
        - fix leak in bulk IO when only partially completed (899, 900, 926)
+       - fix O_DIRECT for ia64 (55)
+        - (almost) eliminate Lustre-kernel-thread effects on load average (722)
+       - C-z after timeout could hang a process forever; fixed (977)
+       * Features
+       - client-side I/O cache (678, 924, 929, 941, 970)
        * protocol changes
        - READPAGE and SETATTRs which don't take server-side locks get
          their own portal
index 9f837ad..7ad7358 100644 (file)
@@ -22,7 +22,7 @@ SUBDIRS = $(DIRS24) obdclass mds utils ptlrpc ldlm lib obdfilter mdc osc ost
 SUBDIRS+= llite obdecho lov cobd tests doc scripts conf
 endif
 
-DIST_SUBDIRS = $(SUBDIRS)
+DIST_SUBDIRS = $(SUBDIRS) liblustre
 EXTRA_DIST = BUGS FDL Rules include archdep.m4 kernel_patches
 
 # We get the version from the spec file.
index de89c76..7906908 100644 (file)
@@ -1,6 +1,6 @@
 #######################################################################
 # lustre ldap config database
-# $Id: slapd-lustre.conf,v 1.2 2003/01/06 22:17:53 adilger Exp $
+# $Id: slapd-lustre.conf,v 1.3 2003/03/11 23:36:45 pschwan Exp $
 #######################################################################
 
 database       ldbm
index 6384d30..5c5f438 100644 (file)
@@ -57,13 +57,29 @@ fi
 AC_SUBST(LIBREADLINE)
 AC_SUBST(HAVE_LIBREADLINE)
 
+AC_ARG_ENABLE(efence,  [  --enable-efence  use efence library],,
+                       enable_efence="no")
+if test "$enable_efence" = "yes" ; then
+   LIBEFENCE="-lefence"
+   HAVE_LIBEFENCE="-DHAVE_LIBEFENCE=1"
+else 
+   LIBEFENCE=""
+   HAVE_LIBEFENCE=""
+fi
+AC_SUBST(LIBEFENCE)
+AC_SUBST(HAVE_LIBEFENCE)
+
 # XXX this should be a runtime option
+AC_MSG_CHECKING(if you are enabling OST recovery...)
 AC_ARG_ENABLE(ost_recovery, [  --enable-ost-recovery: enable support for ost recovery],,
-             enable_ost_recovery="yes")
+             enable_ost_recovery="no")
 if test "$enable_ost_recovery" = "yes" ; then
    ENABLE_OST_RECOVERY="-DOST_RECOVERY=1"
+   AC_MSG_RESULT(yes)
 else 
-   HAVE_LIBREADLINE=""
+   ENABLE_OST_RECOVERY=""
+   AC_MSG_RESULT(no)
 fi
 AC_SUBST(ENABLE_OST_RECOVERY)
 
@@ -131,7 +147,7 @@ KINCFLAGS='-I$(top_srcdir)/include -I$(PORTALS)/include -I$(LINUX)/include'
 else
 KINCFLAGS='-I$(top_srcdir)/include -I$(PORTALS)/include'
 fi
-CPPFLAGS="$KINCFLAGS $ARCHCPPFLAGS"
+CPPFLAGS="$KINCFLAGS $ARCHCPPFLAGS $ENABLE_OST_RECOVERY"
 
 if test $host_cpu != "lib" ; then 
 AC_MSG_CHECKING(if make dep has been run in kernel source (host $host_cpu) )
@@ -153,7 +169,7 @@ AC_MSG_CHECKING(for Linux release)
 dnl We need to rid ourselves of the nasty [ ] quotes.
 changequote(, )
 dnl Get release from version.h
-RELEASE="`sed -ne 's/.*UTS_RELEASE[ \"]*\([0-9.a-zA-Z-]*\).*/\1/p' $LINUX/include/linux/version.h`"
+RELEASE="`sed -ne 's/.*UTS_RELEASE[ \"]*\([0-9.a-zA-Z_-]*\).*/\1/p' $LINUX/include/linux/version.h`"
 changequote([, ])
 
 moduledir='$(libdir)/modules/'$RELEASE/kernel
diff --git a/lustre/extN/ext3-2.5-noread.diff b/lustre/extN/ext3-2.5-noread.diff
new file mode 100644 (file)
index 0000000..f1c611f
--- /dev/null
@@ -0,0 +1,266 @@
+===== fs/ext3/ialloc.c 1.26 vs edited =====
+--- 1.26/fs/ext3/ialloc.c      Fri Feb 14 19:24:09 2003
++++ edited/fs/ext3/ialloc.c    Sat Mar  8 01:20:55 2003
+@@ -195,6 +195,36 @@
+ }
+ /*
++ * @block_group: block group of inode
++ * @offset: relative offset of inode within @block_group
++ *
++ * Check whether any of the inodes in this disk block are in use.
++ *
++ * Caller must be holding superblock lock (group/bitmap read lock in
++ * future).
++ */
++int ext3_itable_block_used(struct super_block *sb, unsigned int block_group,
++                         int offset)
++{
++      struct buffer_head *ibitmap = read_inode_bitmap(sb, block_group);
++      int inodes_per_block;
++      unsigned long inum, iend;
++
++      if (!ibitmap)
++              return 1;
++
++      inodes_per_block = sb->s_blocksize / EXT3_SB(sb)->s_inode_size;
++      inum = offset & ~(inodes_per_block - 1);
++      iend = inum + inodes_per_block;
++      for (; inum < iend; inum++) {
++              if (inum != offset && ext3_test_bit(inum, ibitmap->b_data))
++                      return 1;
++      }
++
++      return 0;
++}
++
++/*
+  * There are two policies for allocating an inode.  If the new inode is
+  * a directory, then a forward search is made for a block group with both
+  * free space and a low directory-to-inode ratio; if that fails, then of
+@@ -422,8 +452,9 @@
+       struct ext3_group_desc * gdp;
+       struct ext3_super_block * es;
+       struct ext3_inode_info *ei;
+-      int err = 0;
++      struct ext3_iloc iloc;
+       struct inode *ret;
++      int err = 0;
+       /* Cannot create files in a deleted directory */
+       if (!dir || !dir->i_nlink)
+@@ -587,16 +618,23 @@
+               goto fail2;
+       }
+       err = ext3_init_acl(handle, inode, dir);
++      if (err)
++              goto fail3;
++
++      err = ext3_get_inode_loc_new(inode, &iloc, 1);
++      if (err)
++              goto fail3;
++
++      BUFFER_TRACE(iloc->bh, "get_write_access");
++      err = ext3_journal_get_write_access(handle, iloc.bh);
+       if (err) {
+-              DQUOT_FREE_INODE(inode);
+-              goto fail2;
+-      }
+-      err = ext3_mark_inode_dirty(handle, inode);
+-      if (err) {
+-              ext3_std_error(sb, err);
+-              DQUOT_FREE_INODE(inode);
+-              goto fail2;
+-      }
++              brelse(iloc.bh);
++              iloc.bh = NULL;
++              goto fail3;
++      }
++      err = ext3_mark_iloc_dirty(handle, inode, &iloc);
++      if (err)
++              goto fail3;
+       ext3_debug("allocating inode %lu\n", inode->i_ino);
+       goto really_out;
+@@ -610,6 +648,9 @@
+       brelse(bitmap_bh);
+       return ret;
++fail3:
++      ext3_std_error(sb, err);
++      DQUOT_FREE_INODE(inode);
+ fail2:
+       inode->i_flags |= S_NOQUOTA;
+       inode->i_nlink = 0;
+===== fs/ext3/inode.c 1.62 vs edited =====
+--- 1.62/fs/ext3/inode.c       Fri Feb 14 19:24:09 2003
++++ edited/fs/ext3/inode.c     Sat Mar  8 02:10:39 2003
+@@ -2144,69 +2144,118 @@
+       unlock_kernel();
+ }
+-/* 
+- * ext3_get_inode_loc returns with an extra refcount against the
+- * inode's underlying buffer_head on success. 
+- */
++#define NUM_INODE_PREREAD 16
+-int ext3_get_inode_loc (struct inode *inode, struct ext3_iloc *iloc)
++/*
++ * ext3_get_inode_loc returns with an extra refcount against the inode's
++ * underlying buffer_head on success.  If this is for a new inode allocation
++ * (new is non-zero) then we may be able to optimize away the read if there
++ * are no other in-use inodes in this inode table block.  If we need to do
++ * a read, then read in a whole chunk of blocks to avoid blocking again soon
++ * if we are doing lots of creates/updates.
++ */
++int ext3_get_inode_loc_new(struct inode *inode, struct ext3_iloc *iloc, int new)
+ {
+-      struct buffer_head *bh = 0;
++      struct buffer_head *bh[NUM_INODE_PREREAD];
++      struct super_block *sb = inode->i_sb;
++      struct ext3_sb_info *sbi = EXT3_SB(sb);
++      unsigned long ino = inode->i_ino;
+       unsigned long block;
+       unsigned long block_group;
+       unsigned long group_desc;
+       unsigned long desc;
+       unsigned long offset;
+       struct ext3_group_desc * gdp;
+-              
+-      if ((inode->i_ino != EXT3_ROOT_INO &&
+-              inode->i_ino != EXT3_JOURNAL_INO &&
+-              inode->i_ino < EXT3_FIRST_INO(inode->i_sb)) ||
+-              inode->i_ino > le32_to_cpu(
+-                      EXT3_SB(inode->i_sb)->s_es->s_inodes_count)) {
+-              ext3_error (inode->i_sb, "ext3_get_inode_loc",
+-                          "bad inode number: %lu", inode->i_ino);
++
++      if ((ino != EXT3_ROOT_INO && ino != EXT3_JOURNAL_INO &&
++           ino < EXT3_FIRST_INO(sb)) ||
++          ino > le32_to_cpu(sbi->s_es->s_inodes_count)) {
++              ext3_error(sb, "ext3_get_inode_loc", "bad inode number: %lu",
++                         ino);
+               goto bad_inode;
+       }
+-      block_group = (inode->i_ino - 1) / EXT3_INODES_PER_GROUP(inode->i_sb);
+-      if (block_group >= EXT3_SB(inode->i_sb)->s_groups_count) {
+-              ext3_error (inode->i_sb, "ext3_get_inode_loc",
+-                          "group >= groups count");
++      block_group = (ino - 1) / EXT3_INODES_PER_GROUP(sb);
++      if (block_group >= EXT3_SB(sb)->s_groups_count) {
++              ext3_error(sb, "ext3_get_inode_loc", "group >= groups count");
+               goto bad_inode;
+       }
+-      group_desc = block_group >> EXT3_DESC_PER_BLOCK_BITS(inode->i_sb);
+-      desc = block_group & (EXT3_DESC_PER_BLOCK(inode->i_sb) - 1);
+-      bh = EXT3_SB(inode->i_sb)->s_group_desc[group_desc];
+-      if (!bh) {
+-              ext3_error (inode->i_sb, "ext3_get_inode_loc",
+-                          "Descriptor not loaded");
++      group_desc = block_group >> EXT3_DESC_PER_BLOCK_BITS(sb);
++      desc = block_group & (EXT3_DESC_PER_BLOCK(sb) - 1);
++      if (!sbi->s_group_desc[group_desc]) {
++              ext3_error(sb, "ext3_get_inode_loc", "Descriptor not loaded");
+               goto bad_inode;
+       }
+-      gdp = (struct ext3_group_desc *) bh->b_data;
++      gdp = (struct ext3_group_desc *)(sbi->s_group_desc[group_desc]->b_data);
+       /*
+        * Figure out the offset within the block group inode table
+        */
+-      offset = ((inode->i_ino - 1) % EXT3_INODES_PER_GROUP(inode->i_sb)) *
+-              EXT3_INODE_SIZE(inode->i_sb);
++      offset = ((ino - 1) % EXT3_INODES_PER_GROUP(sb));
+       block = le32_to_cpu(gdp[desc].bg_inode_table) +
+-              (offset >> EXT3_BLOCK_SIZE_BITS(inode->i_sb));
+-      if (!(bh = sb_bread(inode->i_sb, block))) {
+-              ext3_error (inode->i_sb, "ext3_get_inode_loc",
+-                          "unable to read inode block - "
+-                          "inode=%lu, block=%lu", inode->i_ino, block);
+-              goto bad_inode;
++              (offset * sbi->s_inode_size >> EXT3_BLOCK_SIZE_BITS(sb));
++      bh[0] = sb_getblk(sb, block);
++      if (buffer_uptodate(bh[0]))
++              goto done;
++
++      /* If we don't really need to read this block, and it isn't already
++       * in memory, then we just zero it out.  Otherwise, we keep the
++       * current block contents (deleted inode data) for posterity.
++       */
++      if (new && !ext3_itable_block_used(sb, block_group, offset)) {
++              lock_buffer(bh[0]);
++              memset(bh[0]->b_data, 0, bh[0]->b_size);
++              set_buffer_uptodate(bh[0]);
++              unlock_buffer(bh[0]);
++      } else {
++              unsigned long block_end, itable_end;
++              int count = 1;
++
++              itable_end = le32_to_cpu(gdp[desc].bg_inode_table) +
++                      sbi->s_itb_per_group;
++              block_end = block + NUM_INODE_PREREAD;
++              if (block_end > itable_end)
++                      block_end = itable_end;
++
++              for (; block < block_end; block++) {
++                      bh[count] = sb_getblk(sb, block);
++                      if (count && (buffer_uptodate(bh[count]) ||
++                                    buffer_locked(bh[count]))) {
++                              __brelse(bh[count]);
++                      } else
++                              count++;
++              }
++
++              ll_rw_block(READ, count, bh);
++
++              /* Release all but the block we actually need (bh[0]) */
++              while (--count > 0)
++                      __brelse(bh[count]);
++
++              wait_on_buffer(bh[0]);
++              if (!buffer_uptodate(bh[0])) {
++                      ext3_error(sb, __FUNCTION__,
++                                 "unable to read inode block - "
++                                 "inode=%lu, block=%llu", ino,
++                                 (unsigned long long)bh[0]->b_blocknr);
++                      goto bad_inode;
++              }
+       }
+-      offset &= (EXT3_BLOCK_SIZE(inode->i_sb) - 1);
++done:
++      offset = (offset * sbi->s_inode_size) & (EXT3_BLOCK_SIZE(sb) - 1);
+-      iloc->bh = bh;
+-      iloc->raw_inode = (struct ext3_inode *) (bh->b_data + offset);
++      iloc->bh = bh[0];
++      iloc->raw_inode = (struct ext3_inode *)(bh[0]->b_data + offset);
+       iloc->block_group = block_group;
+-      
++
+       return 0;
+-      
++
+  bad_inode:
+       return -EIO;
++}
++
++int ext3_get_inode_loc(struct inode *inode, struct ext3_iloc *iloc)
++{
++      return ext3_get_inode_loc_new(inode, iloc, 0);
+ }
+ void ext3_read_inode(struct inode * inode)
+===== include/linux/ext3_fs.h 1.22 vs edited =====
+--- 1.22/include/linux/ext3_fs.h       Tue Jan 14 00:56:29 2003
++++ edited/include/linux/ext3_fs.h     Sat Mar  8 01:56:28 2003
+@@ -719,6 +719,8 @@
+ extern struct buffer_head * ext3_getblk (handle_t *, struct inode *, long, int, int *);
+ extern struct buffer_head * ext3_bread (handle_t *, struct inode *, int, int, int *);
++extern int ext3_itable_block_used(struct super_block *, unsigned int, int);
++extern int ext3_get_inode_loc_new(struct inode *, struct ext3_iloc *, int);
+ extern int  ext3_get_inode_loc (struct inode *, struct ext3_iloc *);
+ extern void ext3_read_inode (struct inode *);
+ extern void ext3_write_inode (struct inode *, int);
index 463516c..56220e2 100644 (file)
@@ -80,7 +80,7 @@ diff -ru lustre-head/fs/extN/ialloc.c lustre/fs/extN/ialloc.c
 diff -ru lustre-head/fs/extN/inode.c lustre/fs/extN/inode.c
 --- lustre-head/fs/extN/inode.c        Mon Dec 23 10:02:58 2002
 +++ lustre/fs/extN/inode.c     Mon Dec 23 09:50:25 2002
-@@ -2011,23 +1994,32 @@
+@@ -2011,23 +1994,28 @@
        extN_journal_stop(handle, inode);
  }
  
@@ -88,11 +88,7 @@ diff -ru lustre-head/fs/extN/inode.c lustre/fs/extN/inode.c
 - * extN_get_inode_loc returns with an extra refcount against the
 - * inode's underlying buffer_head on success. 
 - */
-+extern int extN_itable_block_used(struct super_block *sb,
-+                                unsigned int block_group,
-+                                int offset);
-+
-+#define NUM_INODE_PREREAD 16
++#define NUM_INODE_PREREAD     16
  
 -int extN_get_inode_loc (struct inode *inode, struct extN_iloc *iloc)
 +/*
@@ -223,3 +219,15 @@ diff -ru lustre-head/fs/extN/inode.c lustre/fs/extN/inode.c
  void extN_read_inode(struct inode * inode)
  {
        struct extN_iloc iloc;
+diff -ru include/linux/extN_fs.h.orig include/linux/extN_fs.h
+--- lustre/include/linux/extN_fs.h.orig        Sat Mar  8 01:23:09 2003
++++ lustre/include/linux/extN_fs.h     Sat Mar  8 01:24:31 2003
+@@ -642,6 +646,8 @@
+ extern struct buffer_head * extN_getblk (handle_t *, struct inode *, long, int, int *);
+ extern struct buffer_head * extN_bread (handle_t *, struct inode *, int, int, int *);
++extern int extN_itable_block_used(struct super_block *sb, unsigned int, int);
++extern int extN_get_inode_loc_new(struct inode *, struct extN_iloc *, int);
+ extern int  extN_get_inode_loc (struct inode *, struct extN_iloc *);
+ extern void extN_read_inode (struct inode *);
+ extern void extN_write_inode (struct inode *, int);
index 0b37021..1e57ea4 100644 (file)
@@ -93,7 +93,7 @@ static inline void *kmalloc(int size, int prot)
 #define kfree(a) free(a)
 #define GFP_KERNEL 1
 #define GFP_HIGHUSER 1
-#define IS_ERR(a) (abs((int)(a)) < 500 ? 1 : 0)
+#define IS_ERR(a) (((a) && abs((int)(a)) < 500) ? 1 : 0)
 #define PTR_ERR(a) ((int)(a))
 
 #define capable(foo) 1
@@ -258,7 +258,7 @@ static inline struct page *alloc_pages(mask,foo)
         if (!pg)
                 return NULL;
 #ifdef MAP_ANONYMOUS
-        pg->addr = mmap(0, PAGE_SIZE, PROT_WRITE, MAP_ANONYMOUS, 0, 0);
+        pg->addr = mmap(0, PAGE_SIZE, PROT_WRITE, MAP_PRIVATE|MAP_ANONYMOUS, 0, 0);
 #else
         pg->addr = malloc(PAGE_SIZE);
 #endif
@@ -347,7 +347,6 @@ extern struct task_struct *current;
 
 #define init_waitqueue_head(l) INIT_LIST_HEAD(&(l)->sleepers)
 #define wake_up(l) do { int a; a++; } while (0)
-#define wait_event(l,m) do { int a; a++; } while (0)
 #define TASK_INTERRUPTIBLE 0
 #define TASK_UNINTERRUPTIBLE 1
 #define TASK_RUNNING 2
index cd8f12b..d0060fc 100644 (file)
@@ -51,6 +51,7 @@ struct lprocfs_static_vars {
 extern struct proc_dir_entry *proc_lustre_root;
 
 
+#ifdef LPROCFS
 #define LPROCFS_INIT_MULTI_VARS(array, size)                              \
 void lprocfs_init_multi_vars(unsigned int idx,                            \
                              struct lprocfs_static_vars *x)               \
@@ -62,7 +63,6 @@ void lprocfs_init_multi_vars(unsigned int idx,                            \
    x->obd_vars = glob[idx].obd_vars;                                      \
 }                                                                         \
 
-#ifdef LPROCFS
 #define LPROCFS_INIT_VARS(vclass, vinstance)           \
 void lprocfs_init_vars(struct lprocfs_static_vars *x)  \
 {                                                      \
@@ -135,6 +135,10 @@ int fct_name(char *page, char **start, off_t off,                \
 static inline struct proc_dir_entry *
 lprocfs_register(const char *name, struct proc_dir_entry *parent,
                  struct lprocfs_vars *list, void *data) { return NULL; }
+#define LPROCFS_INIT_MULTI_VARS(array, size)
+static inline void lprocfs_init_multi_vars(unsigned int idx,
+                                           struct lprocfs_static_vars *x) { return; }
+#define LPROCFS_INIT_VARS(vclass, vinstance)
 static inline void lprocfs_init_vars(struct lprocfs_static_vars *x) { return; }
 static inline int lprocfs_add_vars(struct proc_dir_entry *root,
                                    struct lprocfs_vars *var,
index c120225..70e7e87 100644 (file)
@@ -26,9 +26,11 @@ typedef enum {
         ELDLM_LOCK_CHANGED = 300,
         ELDLM_LOCK_ABORTED = 301,
         ELDLM_LOCK_REPLACED = 302,
+        ELDLM_LOCK_MATCHED = 303,
 
         ELDLM_NAMESPACE_EXISTS = 400,
-        ELDLM_BAD_NAMESPACE    = 401
+        ELDLM_BAD_NAMESPACE    = 401,
+        ELDLM_GETATTR_ERROR    = 402
 } ldlm_error_t;
 
 #define LDLM_NAMESPACE_SERVER 0
@@ -135,7 +137,7 @@ struct ldlm_namespace {
  *
  */
 
-#define RES_HASH_BITS 14
+#define RES_HASH_BITS 10
 #define RES_HASH_SIZE (1UL << RES_HASH_BITS)
 #define RES_HASH_MASK (RES_HASH_SIZE - 1)
 
@@ -342,6 +344,8 @@ struct ldlm_lock *__ldlm_handle2lock(struct lustre_handle *, int flags);
 void ldlm_cancel_callback(struct ldlm_lock *);
 int ldlm_lock_set_data(struct lustre_handle *, void *data, void *cp_data);
 void ldlm_lock_remove_from_lru(struct ldlm_lock *);
+struct ldlm_lock *ldlm_handle2lock_ns(struct ldlm_namespace *,
+                                      struct lustre_handle *);
 
 static inline struct ldlm_lock *ldlm_handle2lock(struct lustre_handle *h)
 {
index 3ef86ac..b99d996 100644 (file)
@@ -314,6 +314,7 @@ struct obd_statfs {
 #define OBD_BRW_WRITE  0x2
 #define OBD_BRW_RWMASK (OBD_BRW_READ | OBD_BRW_WRITE)
 #define OBD_BRW_CREATE 0x4
+#define OBD_BRW_SYNC   0x8
 
 #define OBD_OBJECT_EOF 0xffffffffffffffffULL
 
@@ -322,7 +323,7 @@ struct obd_ioobj {
         obd_gr               ioo_gr;
         __u32                ioo_type;
         __u32                ioo_bufcnt;
-};
+} __attribute__((packed));
 
 struct niobuf_remote {
         __u64 offset;
index 41c67ff..6f38be0 100644 (file)
@@ -29,6 +29,8 @@
 # include <string.h>
 #else
 # include <asm/semaphore.h>
+# include <linux/sched.h>
+# include <linux/signal.h>
 #endif
 #include <linux/types.h>
 #include <linux/portals_lib.h>
@@ -106,6 +108,7 @@ struct obd_brw_set {
         struct list_head brw_desc_head; /* list of ptlrpc_bulk_desc */
         wait_queue_head_t brw_waitq;
         atomic_t brw_refcount;
+        atomic_t brw_desc_count;
         int brw_flags;
 
         int (*brw_callback)(struct obd_brw_set *, int phase);
@@ -575,35 +578,45 @@ struct l_wait_info {
         lwi_cb_data:    data                                                   \
 })
 
+#define LUSTRE_FATAL_SIGS (sigmask(SIGKILL) | sigmask(SIGINT) |                \
+                           sigmask(SIGTERM) | sigmask(SIGQUIT))
+
 #ifdef __KERNEL__
-#define l_sigismember sigismember
-#else
-#define l_sigismember(a,b) (*(a) & b)
-#endif
+static inline sigset_t l_w_e_set_sigs(int sigs)
+{
+        sigset_t old;
+        unsigned long irqflags;
 
-/* XXX this should be one mask-check */
-#define l_killable_pending(task)                                               \
-(l_sigismember(&(task->pending.signal), SIGKILL) ||                              \
- l_sigismember(&(task->pending.signal), SIGINT) ||                               \
- l_sigismember(&(task->pending.signal), SIGTERM))
+        spin_lock_irqsave(&current->sigmask_lock, irqflags);
+        old = current->blocked;
+        siginitsetinv(&current->blocked, sigs);
+        recalc_sigpending(current);
+        spin_unlock_irqrestore(&current->sigmask_lock, irqflags);
+
+        return old;
+}
 
 #define __l_wait_event(wq, condition, info, ret)                               \
 do {                                                                           \
         wait_queue_t __wait;                                                   \
-        long __state;                                                          \
         int __timed_out = 0;                                                   \
-        init_waitqueue_entry(&__wait, current);                                \
+        unsigned long irqflags;                                                \
+        sigset_t blocked;                                                      \
                                                                                \
+        init_waitqueue_entry(&__wait, current);                                \
         add_wait_queue(&wq, &__wait);                                          \
+                                                                               \
+        /* Block all signals (just the non-fatal ones if no timeout). */       \
         if (info->lwi_signals && !info->lwi_timeout)                           \
-            __state = TASK_INTERRUPTIBLE;                                      \
+            blocked = l_w_e_set_sigs(LUSTRE_FATAL_SIGS);                       \
         else                                                                   \
-            __state = TASK_UNINTERRUPTIBLE;                                    \
+            blocked = l_w_e_set_sigs(0);                                       \
+                                                                               \
         for (;;) {                                                             \
-            set_current_state(__state);                                        \
+            set_current_state(TASK_INTERRUPTIBLE);                             \
             if (condition)                                                     \
                     break;                                                     \
-            if (__state == TASK_INTERRUPTIBLE && l_killable_pending(current)) {\
+            if (signal_pending(current)) {                                     \
                 if (info->lwi_on_signal)                                       \
                         info->lwi_on_signal(info->lwi_cb_data);                \
                 ret = -EINTR;                                                  \
@@ -618,21 +631,19 @@ do {                                                                           \
                         break;                                                 \
                     }                                                          \
                     /* We'll take signals after a timeout. */                  \
-                    if (info->lwi_signals) {                                   \
-                        __state = TASK_INTERRUPTIBLE;                          \
-                        /* Check for a pending interrupt. */                   \
-                        if (info->lwi_signals && l_killable_pending(current)) {\
-                            if (info->lwi_on_signal)                           \
-                                info->lwi_on_signal(info->lwi_cb_data);        \
-                            ret = -EINTR;                                      \
-                            break;                                             \
-                        }                                                      \
-                    }                                                          \
+                    if (info->lwi_signals)                                     \
+                        (void)l_w_e_set_sigs(LUSTRE_FATAL_SIGS);               \
                 }                                                              \
             } else {                                                           \
                 schedule();                                                    \
             }                                                                  \
         }                                                                      \
+                                                                               \
+        spin_lock_irqsave(&current->sigmask_lock, irqflags);                   \
+        current->blocked = blocked;                                            \
+        recalc_sigpending(current);                                            \
+        spin_unlock_irqrestore(&current->sigmask_lock, irqflags);              \
+                                                                               \
         current->state = TASK_RUNNING;                                         \
         remove_wait_queue(&wq, &__wait);                                       \
 } while(0)
@@ -645,5 +656,6 @@ do {                                                                           \
                 __l_wait_event(wq, condition, __info, __ret);                  \
         __ret;                                                                 \
 })
+#endif /* __KERNEL__ */
 
 #endif /* _LUSTRE_LIB_H */
index 0c29c79..9657f24 100644 (file)
@@ -56,11 +56,39 @@ struct ll_inode_info {
         struct lov_stripe_md *lli_smd;
         char                 *lli_symlink_name;
         struct semaphore      lli_open_sem;
+        atomic_t              lli_open_count; /* see ll_file_release */
+        /*
+         * the VALID flag and valid_sem are temporary measures to serialize
+         * the manual getattrs that we're doing at lock acquisition.  in
+         * the future the OST will always return its notion of the file
+         * size with the granted locks.
+         */
+        unsigned long         lli_flags;
+#define LLI_F_DID_GETATTR      0
+        struct semaphore      lli_getattr_sem;
+        struct list_head      lli_read_extents;
+        spinlock_t            lli_read_extent_lock;
+
 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
         struct inode          lli_vfs_inode;
 #endif
 };
 
+/*
+ * this lets ll_file_read tell ll_readpages how far ahead it can read
+ * and still be covered by ll_file_read's lock.  2.5 won't need this, but
+ * we have the other problem of other readpage callers making sure that
+ * they're covered by a lock..  
+ */
+struct ll_read_extent {
+        struct list_head re_lli_item;
+        struct task_struct *re_task;
+        struct ldlm_extent re_extent;
+};
+
+int ll_check_dirty( struct super_block *sb );
+int ll_batch_writepage( struct inode *inode, struct page *page );
+
 /* interpet return codes from intent lookup */
 #define LL_LOOKUP_POSITIVE 1
 #define LL_LOOKUP_NEGATIVE 2
@@ -246,11 +274,15 @@ extern struct inode_operations ll_special_inode_operations;
 struct ldlm_lock;
 int ll_lock_callback(struct ldlm_lock *, struct ldlm_lock_desc *, void *data,
                      int flag);
-int ll_size_lock(struct inode *, struct lov_stripe_md *, obd_off start,
-                 int mode, struct lustre_handle *);
-int ll_size_unlock(struct inode *, struct lov_stripe_md *, int mode,
-                   struct lustre_handle *);
-int ll_file_size(struct inode *inode, struct lov_stripe_md *md, char *ostdata);
+int ll_extent_lock_no_validate(struct ll_file_data *fd, struct inode *inode,
+                   struct lov_stripe_md *lsm, int mode,
+                   struct ldlm_extent *extent, struct lustre_handle *lockh);
+int ll_extent_lock(struct ll_file_data *fd, struct inode *inode,
+                   struct lov_stripe_md *lsm, int mode,
+                   struct ldlm_extent *extent, struct lustre_handle *lockh);
+int ll_extent_unlock(struct ll_file_data *fd, struct inode *inode,
+                     struct lov_stripe_md *lsm, int mode,
+                     struct lustre_handle *lockh);
 int ll_create_objects(struct super_block *sb, obd_id id, uid_t uid,
                       gid_t gid, struct lov_stripe_md **lsmp);
 
index 0a881b1..c951637 100644 (file)
@@ -138,7 +138,7 @@ struct mds_client_data {
         __u64 mcd_last_xid;     /* xid for the last transaction */
         __u32 mcd_last_result;  /* result from last RPC */
         __u32 mcd_last_data;    /* per-op data (disposition for open &c.) */
-        __u8 padding[MDS_LR_SIZE - 58];
+        __u8 padding[MDS_LR_SIZE - 74];
 };
 
 /* In-memory access to client data from MDS struct */
index 8c50212..6966424 100644 (file)
@@ -177,6 +177,7 @@ struct ptlrpc_request {
         int rq_reqlen;
         struct lustre_msg *rq_reqmsg;
 
+        int rq_timeout;
         int rq_replen;
         struct lustre_msg *rq_repmsg;
         __u64 rq_transno;
@@ -368,7 +369,8 @@ int ptlrpc_abort_bulk(struct ptlrpc_bulk_desc *bulk);
 struct obd_brw_set *obd_brw_set_new(void);
 void obd_brw_set_add(struct obd_brw_set *, struct ptlrpc_bulk_desc *);
 void obd_brw_set_del(struct ptlrpc_bulk_desc *);
-void obd_brw_set_free(struct obd_brw_set *);
+void obd_brw_set_decref(struct obd_brw_set *set);
+void obd_brw_set_addref(struct obd_brw_set *set);
 
 int ptlrpc_reply(struct ptlrpc_service *svc, struct ptlrpc_request *req);
 int ptlrpc_error(struct ptlrpc_service *svc, struct ptlrpc_request *req);
index f626bab..b571b06 100644 (file)
@@ -34,7 +34,7 @@
 #include <linux/types.h>
 #include <linux/fs.h>
 #include <linux/time.h>
-#endif 
+#endif
 
 #include <linux/obd_support.h>
 #include <linux/lustre_import.h>
@@ -770,6 +770,33 @@ static inline void obdo_from_inode(struct obdo *dst, struct inode *src,
         dst->o_valid |= (valid & ~OBD_MD_FLID);
 }
 
+static inline void obdo_refresh_inode(struct inode *dst, struct obdo *src,
+                                      obd_flag valid)
+{
+        valid &= src->o_valid;
+
+#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
+        if (valid & OBD_MD_FLATIME && src->o_atime > dst->i_atime)
+                dst->i_atime = src->o_atime;
+        if (valid & OBD_MD_FLMTIME && src->o_mtime > dst->i_mtime)
+                dst->i_mtime = src->o_mtime;
+        if (valid & OBD_MD_FLCTIME && src->o_ctime > dst->i_ctime)
+                dst->i_ctime = src->o_ctime;
+#else
+        if (valid & OBD_MD_FLATIME && src->o_atime > dst->i_atime.tv_sec)
+                dst->i_atime.tv_sec = src->o_atime;
+        if (valid & OBD_MD_FLMTIME && src->o_mtime > dst->i_mtime.tv_sec)
+                dst->i_mtime.tv_sec = src->o_mtime;
+        if (valid & OBD_MD_FLCTIME && src->o_ctime > dst->i_ctime.tv_sec)
+                dst->i_ctime.tv_sec = src->o_ctime;
+#endif
+        if (valid & OBD_MD_FLSIZE && src->o_size > dst->i_size)
+                dst->i_size = src->o_size;
+        /* allocation of space */
+        if (valid & OBD_MD_FLBLOCKS && src->o_blocks > dst->i_blocks)
+                dst->i_blocks = src->o_blocks;
+}
+
 static inline void obdo_to_inode(struct inode *dst, struct obdo *src,
                                  obd_flag valid)
 {
index f8d1486..9ef7052 100644 (file)
 #define LUSTRE_SANOST_NAME "sanost"
 
 /* ost/ost_pack.c */
-void ost_pack_niobuf(void **tmp, __u64 offset, __u32 len, __u32 flags,
-                     __u32 xid);
-void ost_unpack_niobuf(void **tmp, struct niobuf_remote **nbp);
-void ost_pack_ioo(struct obd_ioobj **ioop, struct lov_stripe_md *oa,int bufcnt);
-void ost_unpack_ioo(struct obd_ioobj **tmp, struct obd_ioobj **ioop);
+void ost_pack_niobuf(struct niobuf_remote *nb, __u64 offset, __u32 len,
+                     __u32 flags, __u32 xid);
+void ost_unpack_niobuf(struct niobuf_remote *dst, struct niobuf_remote *src);
+void ost_pack_ioo(struct obd_ioobj *ioo, struct lov_stripe_md *lsm, int bufcnt);
+void ost_unpack_ioo(struct obd_ioobj *dst, struct obd_ioobj *src);
 
 #endif
index b4f9fe9..3af66b5 100644 (file)
@@ -24,7 +24,7 @@ extern void ptlbd_sv_exit(void);
 
 extern void ptlbd_blk_register(struct ptlbd_obd *ptlbd);
 extern int ptlbd_send_req(struct ptlbd_obd *, ptlbd_cmd_t cmd, 
-               struct buffer_head *);
+               struct request *);
 extern int ptlbd_parse_req(struct ptlrpc_request *req);
 
 #endif
index a4d676d..85e577a 100644 (file)
@@ -124,13 +124,21 @@ extern unsigned long obd_sync_filter;
                               ((obd_fail_loc & (OBD_FAILED | OBD_FAIL_ONCE))!= \
                                 (OBD_FAILED | OBD_FAIL_ONCE)))
 
-#define OBD_FAIL_RETURN(id, ret)                                             \
-do {                                                                         \
+#define OBD_FAIL_CHECK_ONCE(id)                                              \
+({      int _ret_ = 0;                                                       \
         if (OBD_FAIL_CHECK(id)) {                                            \
-                CERROR("obd_fail_loc=%x, fail operation rc=%d\n", id, ret);  \
+                CERROR("obd_fail_loc=%x\n", id);                             \
                 obd_fail_loc |= OBD_FAILED;                                  \
                 if ((id) & OBD_FAIL_ONCE)                                    \
                         obd_fail_loc |= OBD_FAIL_ONCE;                       \
+                _ret_ = 1;                                                   \
+        }                                                                    \
+        _ret_;                                                               \
+})
+
+#define OBD_FAIL_RETURN(id, ret)                                             \
+do {                                                                         \
+        if (OBD_FAIL_CHECK_ONCE(id)) {                                       \
                 RETURN(ret);                                                 \
         }                                                                    \
 } while(0)
diff --git a/lustre/kernel_patches/patches/iod-stock-24-exports_hp.patch b/lustre/kernel_patches/patches/iod-stock-24-exports_hp.patch
new file mode 100644 (file)
index 0000000..669b44d
--- /dev/null
@@ -0,0 +1,41 @@
+--- linux-2.4.19-hp2_pnnl4_Lv13/fs/inode.c.iod-export  2003-02-27 14:28:04.000000000 -0800
++++ linux-2.4.19-hp2_pnnl4_Lv13/fs/inode.c     2003-03-03 13:54:59.000000000 -0800
+@@ -5,6 +5,7 @@
+  */
+ #include <linux/config.h>
++#include <linux/module.h>
+ #include <linux/fs.h>
+ #include <linux/string.h>
+ #include <linux/mm.h>
+@@ -66,7 +67,8 @@
+  * NOTE! You also have to own the lock if you change
+  * the i_state of an inode while it is in use..
+  */
+-static spinlock_t inode_lock = SPIN_LOCK_UNLOCKED;
++spinlock_t inode_lock = SPIN_LOCK_UNLOCKED;
++EXPORT_SYMBOL(inode_lock);
+ /*
+  * Statistics gathering..
+--- linux-2.4.19-hp2_pnnl4_Lv13/fs/Makefile.iod-export 2003-02-27 14:28:01.000000000 -0800
++++ linux-2.4.19-hp2_pnnl4_Lv13/fs/Makefile    2003-03-03 13:56:11.000000000 -0800
+@@ -7,7 +7,7 @@
+ O_TARGET := fs.o
+-export-objs :=        filesystems.o open.o dcache.o buffer.o dquot.o
++export-objs :=        filesystems.o open.o dcache.o buffer.o dquot.o inode.o
+ mod-subdirs :=        nls xfs
+ obj-y :=      open.o read_write.o devices.o file_table.o buffer.o \
+--- linux-2.4.19-hp2_pnnl4_Lv13/mm/page_alloc.c.iod-export     2003-02-27 14:28:01.000000000 -0800
++++ linux-2.4.19-hp2_pnnl4_Lv13/mm/page_alloc.c        2003-03-03 13:54:59.000000000 -0800
+@@ -28,6 +28,7 @@
+ LIST_HEAD(inactive_list);
+ LIST_HEAD(active_list);
+ pg_data_t *pgdat_list;
++EXPORT_SYMBOL(pgdat_list);
+ /* Used to look up the address of the struct zone encoded in page->zone */
+ zone_t *zone_table[MAX_NR_ZONES*MAX_NR_NODES];
index 6723ab6..bf276fb 100644 (file)
@@ -5,3 +5,4 @@ jbd-transno-cb.patch
 lustre_version.patch
 vfs_intent_hp.patch
 invalidate_show.patch
+iod-stock-24-exports_hp.patch
diff --git a/lustre/kernel_patches/series/rh-8.0 b/lustre/kernel_patches/series/rh-8.0
deleted file mode 100644 (file)
index 2ba39f5..0000000
+++ /dev/null
@@ -1,9 +0,0 @@
-dev_read_only.patch
-exports.patch
-kmem_cache_validate.patch
-lustre_version.patch
-uml_check_get_page.patch
-uml_no_panic.patch
-vfs_intent.patch
-uml_compile_fixes.patch
-invalidate_show.patch
index c1d3182..dafcb6e 100644 (file)
 #define DEBUG_SUBSYSTEM S_LDLM
 
 #ifdef __KERNEL__
-#include <linux/module.h>
-#include <linux/slab.h>
-#include <linux/init.h>
-#else 
-#include <liblustre.h>
+# include <linux/module.h>
+# include <linux/slab.h>
+# include <linux/init.h>
+#else
+# include <liblustre.h>
 #endif
 
 #include <linux/lustre_dlm.h>
 #include <linux/obd_class.h>
 
-
 extern kmem_cache_t *ldlm_resource_slab;
 extern kmem_cache_t *ldlm_lock_slab;
 extern struct lustre_lock ldlm_handle_lock;
@@ -189,6 +188,7 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock,
         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
 
         req->rq_level = LUSTRE_CONN_RECOVD;
+        req->rq_timeout = 2;
         rc = ptlrpc_queue_wait(req);
         if (rc == -ETIMEDOUT || rc == -EINTR) {
                 ldlm_del_waiting_lock(lock);
@@ -236,6 +236,7 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data)
         req->rq_replen = lustre_msg_size(0, NULL);
 
         req->rq_level = LUSTRE_CONN_RECOVD;
+        req->rq_timeout = 2;
         rc = ptlrpc_queue_wait(req);
         if (rc == -ETIMEDOUT || rc == -EINTR) {
                 ldlm_del_waiting_lock(lock);
@@ -434,28 +435,21 @@ int ldlm_handle_cancel(struct ptlrpc_request *req)
         RETURN(0);
 }
 
-struct ldlm_lock *ldlm_handle2lock_ns(struct ldlm_namespace *ns,
-                                      struct lustre_handle *handle);
-
-static int ldlm_handle_bl_callback(struct ptlrpc_request *req,
-                                   struct ldlm_namespace *ns)
+static void ldlm_handle_bl_callback(struct ptlrpc_request *req,
+                                    struct ldlm_namespace *ns,
+                                    struct ldlm_request *dlm_req,
+                                    struct ldlm_lock *lock)
 {
-        struct ldlm_request *dlm_req;
-        struct ldlm_lock *lock;
         int do_ast;
         ENTRY;
 
-        OBD_FAIL_RETURN(OBD_FAIL_OSC_LOCK_BL_AST, 0);
-
-        dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
-
-        lock = ldlm_handle2lock_ns(ns, &dlm_req->lock_handle1);
-        if (!lock) {
-                CDEBUG(D_INFO, "blocking callback on lock "LPX64
-                       " - lock disappeared\n", dlm_req->lock_handle1.cookie);
-                RETURN(-EINVAL);
-        }
-
+        /* Try to narrow down this damn iozone bug */
+        if (lock->l_resource == NULL)
+                CERROR("lock %p resource NULL\n", lock);
+        if (lock->l_resource->lr_type != LDLM_EXTENT)
+                if (lock->l_resource->lr_namespace != ns)
+                        CERROR("lock %p namespace %p != passed ns %p\n", lock,
+                               lock->l_resource->lr_namespace, ns);
         LDLM_DEBUG(lock, "client blocking AST callback handler START");
 
         l_lock(&ns->ns_lock);
@@ -476,28 +470,17 @@ static int ldlm_handle_bl_callback(struct ptlrpc_request *req,
 
         LDLM_DEBUG(lock, "client blocking callback handler END");
         LDLM_LOCK_PUT(lock);
-        RETURN(0);
+        EXIT;
 }
 
-static int ldlm_handle_cp_callback(struct ptlrpc_request *req,
-                                   struct ldlm_namespace *ns)
+static void ldlm_handle_cp_callback(struct ptlrpc_request *req,
+                                    struct ldlm_namespace *ns,
+                                    struct ldlm_request *dlm_req,
+                                    struct ldlm_lock *lock)
 {
-        struct list_head ast_list = LIST_HEAD_INIT(ast_list);
-        struct ldlm_request *dlm_req;
-        struct ldlm_lock *lock;
+        LIST_HEAD(ast_list);
         ENTRY;
 
-        OBD_FAIL_RETURN(OBD_FAIL_OSC_LOCK_CP_AST, 0);
-
-        dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
-
-        lock = ldlm_handle2lock_ns(ns, &dlm_req->lock_handle1);
-        if (!lock) {
-                CERROR("completion callback on lock "LPX64" - lock "
-                       "disappeared\n", dlm_req->lock_handle1.cookie);
-                RETURN(-EINVAL);
-        }
-
         LDLM_DEBUG(lock, "client completion callback handler START");
 
         l_lock(&ns->ns_lock);
@@ -530,12 +513,24 @@ static int ldlm_handle_cp_callback(struct ptlrpc_request *req,
 
         LDLM_DEBUG_NOLOCK("client completion callback handler END (lock %p)",
                           lock);
-        RETURN(0);
+        EXIT;
+}
+
+static int ldlm_callback_reply(struct ptlrpc_request *req, int rc)
+{
+        req->rq_status = rc;
+        rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen,
+                             &req->rq_repmsg);
+        if (rc)
+                return rc;
+        return ptlrpc_reply(req->rq_svc, req);
 }
 
 static int ldlm_callback_handler(struct ptlrpc_request *req)
 {
         struct ldlm_namespace *ns;
+        struct ldlm_request *dlm_req;
+        struct ldlm_lock *lock;
         int rc;
         ENTRY;
 
@@ -556,8 +551,17 @@ static int ldlm_callback_handler(struct ptlrpc_request *req)
                 dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
                 CERROR("--> lock addr: "LPX64", cookie: "LPX64"\n",
                        dlm_req->lock_handle1.addr,dlm_req->lock_handle1.cookie);
-                rc = -ENOTCONN;
-                goto out;
+                ldlm_callback_reply(req, -ENOTCONN);
+                RETURN(0);
+        }
+
+        if (req->rq_reqmsg->opc == LDLM_BL_CALLBACK) {
+                OBD_FAIL_RETURN(OBD_FAIL_LDLM_BL_CALLBACK, 0);
+        } else if (req->rq_reqmsg->opc == LDLM_CP_CALLBACK) {
+                OBD_FAIL_RETURN(OBD_FAIL_LDLM_CP_CALLBACK, 0);
+        } else {
+                ldlm_callback_reply(req, -EIO);
+                RETURN(0);
         }
 
         LASSERT(req->rq_export != NULL);
@@ -565,27 +569,30 @@ static int ldlm_callback_handler(struct ptlrpc_request *req)
         ns = req->rq_export->exp_obd->obd_namespace;
         LASSERT(ns != NULL);
 
+        dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
+        lock = ldlm_handle2lock_ns(ns, &dlm_req->lock_handle1);
+        if (!lock) {
+                CDEBUG(D_INODE, "callback on lock "LPX64" - lock disappeared\n",
+                       dlm_req->lock_handle1.cookie);
+                ldlm_callback_reply(req, -EINVAL);
+                RETURN(0);
+        }
+
+        /* we want the ost thread to get this reply so that it can respond
+         * to ost requests (write cache writeback) that might be triggered
+         * in the callback */
+        ldlm_callback_reply(req, 0);
+
         switch (req->rq_reqmsg->opc) {
         case LDLM_BL_CALLBACK:
                 CDEBUG(D_INODE, "blocking ast\n");
-                OBD_FAIL_RETURN(OBD_FAIL_LDLM_BL_CALLBACK, 0);
-                rc = ldlm_handle_bl_callback(req, ns);
+                ldlm_handle_bl_callback(req, ns, dlm_req, lock);
                 break;
         case LDLM_CP_CALLBACK:
                 CDEBUG(D_INODE, "completion ast\n");
-                OBD_FAIL_RETURN(OBD_FAIL_LDLM_CP_CALLBACK, 0);
-                rc = ldlm_handle_cp_callback(req, ns);
+                ldlm_handle_cp_callback(req, ns, dlm_req, lock);
                 break;
-        default:
-                CERROR("invalid opcode %d\n", req->rq_reqmsg->opc);
-                RETURN(-EINVAL);
         }
- out:
-        req->rq_status = rc;
-        rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
-        if (rc)
-                RETURN(rc);
-        ptlrpc_reply(req->rq_svc, req);
 
         RETURN(0);
 }
@@ -610,9 +617,7 @@ static int ldlm_cancel_handler(struct ptlrpc_request *req)
                        req->rq_reqmsg->addr, req->rq_reqmsg->cookie);
                 dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
                 ldlm_lock_dump_handle(D_ERROR, &dlm_req->lock_handle1);
-                CERROR("--> ignoring this error as a temporary workaround!  "
-                       "beware!\n");
-                //RETURN(-ENOTCONN);
+                RETURN(-ENOTCONN);
         }
 
         switch (req->rq_reqmsg->opc) {
index c77b5b8..c76ff32 100644 (file)
 #include <linux/obd_ost.h>
 #include <linux/lustre_net.h>
 
-void ost_pack_ioo(struct obd_ioobj **tmp, struct lov_stripe_md *lsm,int bufcnt)
+void ost_pack_ioo(struct obd_ioobj *ioo, struct lov_stripe_md *lsm, int bufcnt)
 {
-        struct obd_ioobj *ioo = *tmp;
-        void *p = *tmp;
-
         ioo->ioo_id = HTON__u64(lsm->lsm_object_id);
         ioo->ioo_gr = HTON__u64(0);
         ioo->ioo_type = HTON__u32(S_IFREG);
         ioo->ioo_bufcnt = HTON__u32(bufcnt);
-        *tmp = p + sizeof(*ioo);
 }
 
-void ost_unpack_ioo(struct obd_ioobj **tmp, struct obd_ioobj **ioop)
+void ost_unpack_ioo(struct obd_ioobj *dst, struct obd_ioobj *src)
 {
-        void *p = *tmp;
-        struct obd_ioobj *ioo = *tmp;
-        *ioop = *tmp;
-
-        ioo->ioo_id = NTOH__u64(ioo->ioo_id);
-        ioo->ioo_gr = NTOH__u64(ioo->ioo_gr);
-        ioo->ioo_type = NTOH__u32(ioo->ioo_type);
-        ioo->ioo_bufcnt = NTOH__u32(ioo->ioo_bufcnt);
-        *tmp = p + sizeof(*ioo);
+        dst->ioo_id = NTOH__u64(src->ioo_id);
+        dst->ioo_gr = NTOH__u64(src->ioo_gr);
+        dst->ioo_type = NTOH__u32(src->ioo_type);
+        dst->ioo_bufcnt = NTOH__u32(src->ioo_bufcnt);
 }
 
-void ost_pack_niobuf(void **tmp, __u64 offset, __u32 len, __u32 flags,
-                     __u32 xid)
+void ost_pack_niobuf(struct niobuf_remote *nb, __u64 offset, __u32 len,
+                     __u32 flags, __u32 xid)
 {
-        struct niobuf_remote *nb = *tmp;
-        char *c = *tmp;
-
         nb->offset = HTON__u64(offset);
         nb->len = HTON__u32(len);
-        nb->flags = HTON__u32(flags);
         nb->xid = HTON__u32(xid);
-        *tmp = c + sizeof(*nb);
+        nb->flags = HTON__u32(flags);
 }
 
-void ost_unpack_niobuf(void **tmp, struct niobuf_remote **nbp)
+void ost_unpack_niobuf(struct niobuf_remote *dst, struct niobuf_remote *src)
 {
-        char *c = *tmp;
-        struct niobuf_remote *nb = *tmp;
-
-        *nbp = *tmp;
-
-        nb->offset = NTOH__u64(nb->offset);
-        nb->len = NTOH__u32(nb->len);
-        nb->flags = NTOH__u32(nb->flags);
-
-        *tmp = c + sizeof(*nb);
+        dst->offset = NTOH__u64(src->offset);
+        dst->len = NTOH__u32(src->len);
+        dst->xid = NTOH__u32(src->xid);
+        dst->flags = NTOH__u32(src->flags);
 }
index 590ae4b..82f1164 100644 (file)
@@ -195,7 +195,7 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler)
         dlmimp->imp_client = &export->exp_obd->obd_ldlm_client;
         dlmimp->imp_handle.addr = req->rq_reqmsg->addr;
         dlmimp->imp_handle.cookie = req->rq_reqmsg->cookie;
-        dlmimp->imp_obd = /* LDLM! */ NULL;
+        dlmimp->imp_obd = target;
         dlmimp->imp_recover = NULL;
         INIT_LIST_HEAD(&dlmimp->imp_replay_list);
         INIT_LIST_HEAD(&dlmimp->imp_sending_list);
@@ -373,13 +373,14 @@ static void process_recovery_queue(struct obd_device *obd)
                                  struct ptlrpc_request, rq_list);
 
                 if (req->rq_reqmsg->transno != obd->obd_next_recovery_transno) {
+                        struct l_wait_info lwi = { 0 };
                         spin_unlock_bh(&obd->obd_processing_task_lock);
                         CDEBUG(D_HA, "Waiting for transno "LPD64" (1st is "
                                LPD64")\n",
                                obd->obd_next_recovery_transno,
                                req->rq_reqmsg->transno);
-                        wait_event(obd->obd_next_transno_waitq,
-                                   check_for_next_transno(obd));
+                        l_wait_event(obd->obd_next_transno_waitq,
+                                     check_for_next_transno(obd), &lwi);
                         spin_lock_bh(&obd->obd_processing_task_lock);
                         if (obd->obd_flags & OBD_ABORT_RECOVERY) {
                                 target_abort_recovery(obd);
index c761a22..665295e 100644 (file)
@@ -4,14 +4,14 @@ DEFS=
 CFLAGS:=-g -O2 -I$(top_srcdir)/utils -I$(PORTALS)/include  -I$(srcdir)/../include -Wall -L$(PORTALSLIB)
 
 KFLAGS:=
-CPPFLAGS = $(HAVE_LIBREADLINE)
-LIBS=
+CPPFLAGS = $(HAVE_EFENCE)
+LIBS = $(LIBEFENCE)
 LLIBS= ../lov/liblov.a ../obdecho/libobdecho.a ../osc/libosc.a ../ldlm/libldlm.a  ../ptlrpc/libptlrpc.a ../obdclass/liblustreclass.a
 
 libtest_LDADD := $(LIBREADLINE)  $(LLIBS) \
                  $(PORTALS)/user/procbridge/libprocbridge.a  $(PORTALS)/user/tcpnal/libtcpnal.a \
-                $(PORTALS)/user/util/libtcpnalutil.a $(PORTALS)/user/$(PORTALS)/api/libptlapi.a \
-                 $(PORTALS)/lib/libptllib.a -lptlctl -lpthread -lefence
+                $(PORTALS)/user/util/libtcpnalutil.a $(PORTALS)/api/libptlapi.a \
+                 $(PORTALS)/lib/libptllib.a -lptlctl -lpthread 
 bin_PROGRAMS = libtest
 libtest_SOURCES = libtest.c
 
index 2941398..c344198 100644 (file)
@@ -10,7 +10,7 @@
 #include <liblustre.h>
 #include <linux/obd.h>
 #include <linux/obd_class.h>
-#include <../user/procbridge/procbridge.h>
+#include <portals/procbridge.h>
 
 ptl_handle_ni_t         tcpnal_ni;
 
index c536a0a..309088b 100644 (file)
@@ -9,7 +9,7 @@ MODULE = llite
 modulefs_DATA = llite.o
 EXTRA_PROGRAMS = llite
 
-llite_SOURCES = dcache.c commit_callback.c super.c rw.c super25.c
+llite_SOURCES = dcache.c commit_callback.c super.c rw.c iod.c super25.c
 llite_SOURCES += file.c dir.c sysctl.c symlink.c
 llite_SOURCES += recover.c namei.c lproc_llite.c
 
index 0e17c1a..f8b7e70 100644 (file)
@@ -84,8 +84,9 @@ static int ll_commitcbd_main(void *arg)
 
         /* And now, loop forever on requests */
         while (1) {
-                wait_event(sbi->ll_commitcbd_waitq,
-                           ll_commitcbd_check_event(sbi));
+                struct l_wait_info lwi = { 0 };
+                l_wait_event(sbi->ll_commitcbd_waitq,
+                             ll_commitcbd_check_event(sbi), &lwi);
 
                 spin_lock(&sbi->ll_commitcbd_lock);
                 if (sbi->ll_commitcbd_flags & LL_COMMITCBD_STOPPING) {
@@ -112,6 +113,7 @@ static int ll_commitcbd_main(void *arg)
 int ll_commitcbd_setup(struct ll_sb_info *sbi)
 {
         int rc;
+        struct l_wait_info lwi = { 0 };
         ENTRY;
 
         rc = kernel_thread(ll_commitcbd_main, (void *) sbi,
@@ -120,18 +122,19 @@ int ll_commitcbd_setup(struct ll_sb_info *sbi)
                 CERROR("cannot start thread\n");
                 RETURN(rc);
         }
-        wait_event(sbi->ll_commitcbd_ctl_waitq,
-                   sbi->ll_commitcbd_flags & LL_COMMITCBD_RUNNING);
+        l_wait_event(sbi->ll_commitcbd_ctl_waitq,
+                     sbi->ll_commitcbd_flags & LL_COMMITCBD_RUNNING, &lwi);
         RETURN(0);
 }
 
 
 int ll_commitcbd_cleanup(struct ll_sb_info *sbi)
 {
+        struct l_wait_info lwi = { 0 };
         sbi->ll_commitcbd_flags = LL_COMMITCBD_STOPPING;
 
         wake_up(&sbi->ll_commitcbd_waitq);
-        wait_event(sbi->ll_commitcbd_ctl_waitq,
-                   sbi->ll_commitcbd_flags & LL_COMMITCBD_STOPPED);
+        l_wait_event(sbi->ll_commitcbd_ctl_waitq,
+                     sbi->ll_commitcbd_flags & LL_COMMITCBD_STOPPED, &lwi);
         RETURN(0);
 }
index ff5d1d6..4c16e1c 100644 (file)
@@ -124,6 +124,11 @@ static int ll_file_release(struct inode *inode, struct file *file)
         if (!fd) /* no process opened the file after an mcreate */
                 RETURN(rc = 0);
 
+        /* we might not be able to get a valid handle on this file
+         * again so we really want to flush our write cache.. */
+        filemap_fdatasync(inode->i_mapping);
+        filemap_fdatawait(inode->i_mapping);
+
         if (lsm != NULL) {
                 memset(&oa, 0, sizeof(oa));
                 oa.o_id = lsm->lsm_object_id;
@@ -182,17 +187,17 @@ static int ll_osc_open(struct lustre_handle *conn, struct inode *inode,
                 RETURN(-ENOMEM);
         oa->o_id = lsm->lsm_object_id;
         oa->o_mode = S_IFREG;
-        oa->o_valid = (OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLSIZE |
-                       OBD_MD_FLBLOCKS | OBD_MD_FLMTIME | OBD_MD_FLCTIME);
+        oa->o_valid = (OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLBLOCKS |
+                       OBD_MD_FLMTIME | OBD_MD_FLCTIME);
         rc = obd_open(conn, oa, lsm, NULL);
         if (rc)
                 GOTO(out, rc);
 
         file->f_flags &= ~O_LOV_DELAY_CREATE;
-        obdo_to_inode(inode, oa, (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
-                                  OBD_MD_FLMTIME | OBD_MD_FLCTIME));
+        obdo_to_inode(inode, oa, OBD_MD_FLBLOCKS | OBD_MD_FLMTIME |
+                      OBD_MD_FLCTIME);
 
-        if (oa->o_valid |= OBD_MD_FLHANDLE)
+        if (oa->o_valid & OBD_MD_FLHANDLE)
                 memcpy(fd->fd_ostdata, obdo_handle(oa), FD_OSTDATA_SIZE);
 
         EXIT;
@@ -355,85 +360,140 @@ static int ll_file_open(struct inode *inode, struct file *file)
         return rc;
 }
 
-int ll_size_lock(struct inode *inode, struct lov_stripe_md *lsm, obd_off start,
-                 int mode, struct lustre_handle *lockh)
+/*
+ * really does the getattr on the inode and updates its fields
+ */
+int ll_inode_getattr(struct inode *inode, struct lov_stripe_md *lsm,
+                     char *ostdata)
+{
+        struct ll_sb_info *sbi = ll_i2sbi(inode);
+        struct obdo oa;
+        int rc;
+        ENTRY;
+
+        LASSERT(lsm);
+        LASSERT(sbi);
+
+        memset(&oa, 0, sizeof oa);
+        oa.o_id = lsm->lsm_object_id;
+        oa.o_mode = S_IFREG;
+        oa.o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLSIZE |
+                OBD_MD_FLBLOCKS | OBD_MD_FLMTIME | OBD_MD_FLCTIME;
+
+        if (ostdata != NULL) {
+                memcpy(&oa.o_inline, ostdata, FD_OSTDATA_SIZE);
+                oa.o_valid |= OBD_MD_FLHANDLE;
+        }
+
+        rc = obd_getattr(&sbi->ll_osc_conn, &oa, lsm);
+        if (rc)
+                RETURN(rc);
+
+        obdo_to_inode(inode, &oa, OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
+                           OBD_MD_FLMTIME | OBD_MD_FLCTIME);
+
+        CDEBUG(D_INODE, "objid "LPX64" size %Lu/%Lu\n", lsm->lsm_object_id,
+               inode->i_size, inode->i_size);
+        RETURN(0);
+}
+
+/*
+ * we've acquired a lock and need to see if we should perform a getattr
+ * to update the file size that may have been updated by others that had
+ * their locks canceled.
+ */
+static int ll_size_validate(struct inode *inode, struct lov_stripe_md *lsm,
+                            char *ostdata, struct ldlm_extent *extent)
+{
+        struct ll_inode_info *lli = ll_i2info(inode);
+        int rc = 0;
+        ENTRY;
+
+        if (test_bit(LLI_F_DID_GETATTR, &lli->lli_flags))
+                RETURN(0);
+
+        down(&lli->lli_getattr_sem);
+
+        if (!test_bit(LLI_F_DID_GETATTR, &lli->lli_flags)) {
+                rc = ll_inode_getattr(inode, lsm, ostdata);
+                if ( rc == 0 ) 
+                        set_bit(LLI_F_DID_GETATTR, &lli->lli_flags);
+        }
+
+        up(&lli->lli_getattr_sem);
+        RETURN(rc);
+}
+
+/*
+ * some callers, notably truncate, really don't want i_size set based
+ * on the the size returned by the getattr, or lock acquisition in 
+ * the future.
+ */
+int ll_extent_lock_no_validate(struct ll_file_data *fd, struct inode *inode,
+                   struct lov_stripe_md *lsm,
+                   int mode, struct ldlm_extent *extent,
+                   struct lustre_handle *lockh)
 {
         struct ll_sb_info *sbi = ll_i2sbi(inode);
-        struct ldlm_extent extent;
         int rc, flags = 0;
         ENTRY;
 
+        LASSERT(lockh->addr == 0 && lockh->cookie == 0);
+
         /* XXX phil: can we do this?  won't it screw the file size up? */
-        if (sbi->ll_flags & LL_SBI_NOLCK)
+        if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
+            (sbi->ll_flags & LL_SBI_NOLCK))
                 RETURN(0);
 
-        extent.start = start;
-        extent.end = OBD_OBJECT_EOF;
+        CDEBUG(D_INFO, "Locking inode %lu, start "LPU64" end "LPU64"\n",
+               inode->i_ino, extent->start, extent->end);
 
-        rc = obd_enqueue(&sbi->ll_osc_conn, lsm, NULL, LDLM_EXTENT, &extent,
+        rc = obd_enqueue(&sbi->ll_osc_conn, lsm, NULL, LDLM_EXTENT, extent,
                          sizeof(extent), mode, &flags, ll_lock_callback,
                          inode, sizeof(*inode), lockh);
+
         RETURN(rc);
 }
-
-int ll_size_unlock(struct inode *inode, struct lov_stripe_md *lsm, int mode,
+/*
+ * this grabs a lock and manually implements behaviour that makes it look
+ * like the OST is returning the file size with each lock acquisition
+ */
+int ll_extent_lock(struct ll_file_data *fd, struct inode *inode,
+                   struct lov_stripe_md *lsm,
+                   int mode, struct ldlm_extent *extent,
                    struct lustre_handle *lockh)
 {
-        struct ll_sb_info *sbi = ll_i2sbi(inode);
         int rc;
         ENTRY;
 
-        /* XXX phil: can we do this?  won't it screw the file size up? */
-        if (sbi->ll_flags & LL_SBI_NOLCK)
-                RETURN(0);
+        rc = ll_extent_lock_no_validate(fd, inode, lsm, mode, extent, lockh);
 
-        rc = obd_cancel(&sbi->ll_osc_conn, lsm, mode, lockh);
-        if (rc != ELDLM_OK) {
-                CERROR("lock cancel: %d\n", rc);
-                LBUG();
+        if (rc == ELDLM_OK) {
+                rc = ll_size_validate(inode, lsm, fd ? fd->fd_ostdata : NULL,
+                        extent);
+                if ( rc != 0 ) {
+                        ll_extent_unlock(fd, inode, lsm, mode, lockh);
+                        rc = ELDLM_GETATTR_ERROR;
+                }
         }
 
         RETURN(rc);
 }
 
-/* This function is solely "sampling" the file size, and does not explicit
- * locking on the size itself (see ll_size_lock() and ll_size_unlock()).
- *
- * XXX We need to optimize away the obd_getattr for decent performance here,
- *     by checking if we already have the size lock and considering our size
- *     authoritative in that case.  In order to do that either the act of
- *     getting the size lock includes retrieving the file size, or the client
- *     keeps an atomic flag in the inode which indicates whether the size
- *     has been updated (see bug 280).
- */
-int ll_file_size(struct inode *inode, struct lov_stripe_md *lsm, char *ostdata)
+int ll_extent_unlock(struct ll_file_data *fd, struct inode *inode,
+                struct lov_stripe_md *lsm, int mode,
+                struct lustre_handle *lockh)
 {
         struct ll_sb_info *sbi = ll_i2sbi(inode);
-        struct obdo oa;
         int rc;
         ENTRY;
 
-        LASSERT(lsm);
-        LASSERT(sbi);
-
-        memset(&oa, 0, sizeof oa);
-        oa.o_id = lsm->lsm_object_id;
-        oa.o_mode = S_IFREG;
-        oa.o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLSIZE |
-                OBD_MD_FLBLOCKS | OBD_MD_FLMTIME | OBD_MD_FLCTIME;
-
-        if (ostdata != NULL) {
-                memcpy(&oa.o_inline, ostdata, FD_OSTDATA_SIZE);
-                oa.o_valid |= OBD_MD_FLHANDLE;
-        }
+        /* XXX phil: can we do this?  won't it screw the file size up? */
+        if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
+            (sbi->ll_flags & LL_SBI_NOLCK))
+                RETURN(0);
 
-        rc = obd_getattr(&sbi->ll_osc_conn, &oa, lsm);
-        if (!rc) {
-                obdo_to_inode(inode, &oa, OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
-                                        OBD_MD_FLMTIME | OBD_MD_FLCTIME);
-                CDEBUG(D_INODE, "objid "LPX64" size %Lu/%Lx\n",
-                       lsm->lsm_object_id, inode->i_size, inode->i_size);
-        }
+        rc = obd_cancel(&sbi->ll_osc_conn, lsm, mode, lockh);
 
         RETURN(rc);
 }
@@ -481,6 +541,7 @@ int ll_lock_callback(struct ldlm_lock *lock, struct ldlm_lock_desc *new,
                      void *data, int flag)
 {
         struct inode *inode = data;
+        struct ll_inode_info *lli = ll_i2info(inode);
         struct lustre_handle lockh = { 0, 0 };
         int rc;
         ENTRY;
@@ -497,11 +558,15 @@ int ll_lock_callback(struct ldlm_lock *lock, struct ldlm_lock_desc *new,
                         CERROR("ldlm_cli_cancel failed: %d\n", rc);
                 break;
         case LDLM_CB_CANCELING:
+                /* FIXME: we could be given 'canceling intents' so that we
+                 * could know to write-back or simply throw away the pages
+                 * based on if the cancel comes from a desire to, say,
+                 * read or truncate.. */
                 CDEBUG(D_INODE, "invalidating obdo/inode %lu\n", inode->i_ino);
-                /* FIXME: do something better than throwing away everything */
-                //down(&inode->i_sem);
-                ll_invalidate_inode_pages(inode);
-                //up(&inode->i_sem);
+                filemap_fdatasync(inode->i_mapping);
+                filemap_fdatawait(inode->i_mapping);
+                clear_bit(LLI_F_DID_GETATTR, &lli->lli_flags);
+                truncate_inode_pages(inode->i_mapping, 0);
                 break;
         default:
                 LBUG();
@@ -515,57 +580,49 @@ static ssize_t ll_file_read(struct file *filp, char *buf, size_t count,
 {
         struct ll_file_data *fd = filp->private_data;
         struct inode *inode = filp->f_dentry->d_inode;
-        struct ll_sb_info *sbi = ll_i2sbi(inode);
+        struct ll_inode_info *lli = ll_i2info(inode);
+        struct lov_stripe_md *lsm = lli->lli_smd;
         struct lustre_handle lockh = { 0, 0 };
-        struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
-        int flags = 0;
+        struct ll_read_extent rextent;
         ldlm_error_t err;
         ssize_t retval;
         ENTRY;
-
         CDEBUG(D_VFSTRACE, "VFS Op\n");
-        if (!(fd->fd_flags & LL_FILE_IGNORE_LOCK) &&
-            !(sbi->ll_flags & LL_SBI_NOLCK)) {
-                struct ldlm_extent extent;
-                extent.start = *ppos;
-                extent.end = *ppos + count - 1;
-                CDEBUG(D_INFO, "Locking inode %lu, start "LPU64" end "LPU64"\n",
-                       inode->i_ino, extent.start, extent.end);
-
-                err = obd_enqueue(&sbi->ll_osc_conn, lsm, NULL, LDLM_EXTENT,
-                                  &extent, sizeof(extent), LCK_PR, &flags,
-                                  ll_lock_callback, inode, sizeof(*inode),
-                                  &lockh);
-                if (err != ELDLM_OK) {
-                        CERROR("lock enqueue: err: %d\n", err);
-                        RETURN(err);
-                }
-        }
 
-        /* If we don't refresh the file size, generic_file_read may not even
-         * call ll_readpage */
-        retval = ll_file_size(inode, lsm, fd->fd_ostdata);
-        if (retval < 0) {
-                CERROR("ll_file_size: "LPSZ"\n", retval);
+        /* "If nbyte is 0, read() will return 0 and have no other results."
+         *                      -- Single Unix Spec */
+        if (count == 0)
+                RETURN(0);
+
+        rextent.re_extent.start = *ppos;
+        rextent.re_extent.end = *ppos + count - 1;
+
+        err = ll_extent_lock(fd, inode, lsm, 
+                             LCK_PR, &rextent.re_extent, &lockh);
+        if (err != ELDLM_OK && err != ELDLM_LOCK_MATCHED) {
+                retval = -ENOLCK;
                 RETURN(retval);
         }
 
+        /* XXX tell ll_readpage what pages have a PR lock.. */
+        rextent.re_task = current;
+        spin_lock(&lli->lli_read_extent_lock);
+        list_add(&rextent.re_lli_item, &lli->lli_read_extents);
+        spin_unlock(&lli->lli_read_extent_lock);
+
         CDEBUG(D_INFO, "Reading inode %lu, "LPSZ" bytes, offset %Ld\n",
                inode->i_ino, count, *ppos);
         retval = generic_file_read(filp, buf, count, ppos);
 
+        spin_lock(&lli->lli_read_extent_lock);
+        list_del(&rextent.re_lli_item);
+        spin_unlock(&lli->lli_read_extent_lock);
+
         if (retval > 0)
                 ll_update_atime(inode);
 
-        if (!(fd->fd_flags & LL_FILE_IGNORE_LOCK) &&
-            !(sbi->ll_flags & LL_SBI_NOLCK)) {
-                err = obd_cancel(&sbi->ll_osc_conn, lsm, LCK_PR, &lockh);
-                if (err != ELDLM_OK) {
-                        CERROR("lock cancel: err: %d\n", err);
-                        retval = err;
-                }
-        }
-
+        /* XXX errors? */
+        ll_extent_unlock(fd, inode, lsm, LCK_PR, &lockh);
         RETURN(retval);
 }
 
@@ -577,71 +634,43 @@ ll_file_write(struct file *file, const char *buf, size_t count, loff_t *ppos)
 {
         struct ll_file_data *fd = file->private_data;
         struct inode *inode = file->f_dentry->d_inode;
-        struct ll_sb_info *sbi = ll_i2sbi(inode);
-        struct lustre_handle lockh = { 0, 0 }, eof_lockh = { 0, 0 };
+        struct lustre_handle lockh = { 0, 0 };
         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
-        int flags = 0;
+        struct ldlm_extent extent;
         ldlm_error_t err;
         ssize_t retval;
         ENTRY;
 
         /* POSIX, but surprised the VFS doesn't check this already */
         if (count == 0)
-                return 0;
+                RETURN(0);
 
         CDEBUG(D_VFSTRACE, "VFS Op\n");
         if (!S_ISBLK(inode->i_mode) && file->f_flags & O_APPEND) {
-                err = ll_size_lock(inode, lsm, 0, LCK_PW, &eof_lockh);
-                if (err)
-                        RETURN(err);
-
-                /* Get size here so we know extent to enqueue write lock on. */
-                retval = ll_file_size(inode, lsm, fd->fd_ostdata);
-                if (retval)
-                        GOTO(out_eof, retval);
-
-                *ppos = inode->i_size;
-        }
-
-        if (!(fd->fd_flags & LL_FILE_IGNORE_LOCK) &&
-            !(sbi->ll_flags & LL_SBI_NOLCK)) {
-                struct ldlm_extent extent;
+                extent.start = 0;
+                extent.end = OBD_OBJECT_EOF;
+        } else  {
                 extent.start = *ppos;
                 extent.end = *ppos + count - 1;
-                CDEBUG(D_INFO, "Locking inode %lu, start "LPU64" end "LPU64"\n",
-                       inode->i_ino, extent.start, extent.end);
-
-                err = obd_enqueue(&sbi->ll_osc_conn, lsm, NULL, LDLM_EXTENT,
-                                  &extent, sizeof(extent), LCK_PW, &flags,
-                                  ll_lock_callback, inode, sizeof(*inode),
-                                  &lockh);
-                if (err != ELDLM_OK) {
-                        CERROR("lock enqueue: err: %d\n", err);
-                        GOTO(out_eof, retval = err);
-                }
         }
 
+        err = ll_extent_lock(fd, inode, lsm, LCK_PW, &extent, &lockh);
+        if (err != ELDLM_OK && err != ELDLM_LOCK_MATCHED) {
+                retval = -ENOLCK;
+                RETURN(retval);
+        }
+
+        if (!S_ISBLK(inode->i_mode) && file->f_flags & O_APPEND)
+                *ppos = inode->i_size;
+
         CDEBUG(D_INFO, "Writing inode %lu, "LPSZ" bytes, offset %Lu\n",
                inode->i_ino, count, *ppos);
 
         retval = generic_file_write(file, buf, count, ppos);
 
-        if (!(fd->fd_flags & LL_FILE_IGNORE_LOCK) &&
-            !(sbi->ll_flags & LL_SBI_NOLCK)) {
-                err = obd_cancel(&sbi->ll_osc_conn, lsm, LCK_PW, &lockh);
-                if (err != ELDLM_OK)
-                        CERROR("lock cancel: err: %d\n", err);
-        }
-
-        EXIT;
- out_eof:
-        if (!S_ISBLK(inode->i_mode) && file->f_flags & O_APPEND) {
-                err = ll_size_unlock(inode, lsm, LCK_PW, &eof_lockh);
-                if (err)
-                        CERROR("ll_size_unlock: %d\n", err);
-        }
-
-        return retval;
+        /* XXX errors? */
+        ll_extent_unlock(fd, inode, lsm, LCK_PW, &lockh);
+        RETURN(retval);
 }
 
 static int ll_lov_setstripe(struct inode *inode, struct file *file,
@@ -749,25 +778,27 @@ int ll_file_ioctl(struct inode *inode, struct file *file, unsigned int cmd,
 loff_t ll_file_seek(struct file *file, loff_t offset, int origin)
 {
         struct inode *inode = file->f_dentry->d_inode;
-        long long retval;
+        struct ll_file_data *fd = file->private_data;
+        struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
+        struct lustre_handle lockh = {0, 0};
+        loff_t retval;
         ENTRY;
 
         CDEBUG(D_VFSTRACE, "VFS Op\n");
-        switch (origin) {
-        case 2: {
-                struct ll_inode_info *lli = ll_i2info(inode);
-                struct ll_file_data *fd = file->private_data;
-
-                retval = ll_file_size(inode, lli->lli_smd, fd->fd_ostdata);
-                if (retval)
+        if (origin == 2) { /* SEEK_END */
+                ldlm_error_t err;
+                struct ldlm_extent extent = {0, OBD_OBJECT_EOF};
+                err = ll_extent_lock(fd, inode, lsm, LCK_PR, &extent, &lockh);
+                if (err != ELDLM_OK && err != ELDLM_LOCK_MATCHED) {
+                        retval = -ENOLCK;
                         RETURN(retval);
+                }
 
                 offset += inode->i_size;
-                break;
-        }
-        case 1:
+        } else if (origin == 1) { /* SEEK_CUR */
                 offset += file->f_pos;
         }
+
         retval = -EINVAL;
         if (offset >= 0 && offset <= inode->i_sb->s_maxbytes) {
                 if (offset != file->f_pos) {
@@ -779,14 +810,28 @@ loff_t ll_file_seek(struct file *file, loff_t offset, int origin)
                 }
                 retval = offset;
         }
+
+        if (origin == 2)
+                ll_extent_unlock(fd, inode, lsm, LCK_PR, &lockh);
         RETURN(retval);
 }
 
-/* XXX this does not need to do anything for data, it _does_ need to
-   call setattr */
 int ll_fsync(struct file *file, struct dentry *dentry, int data)
 {
-        return 0;
+        int ret;
+        ENTRY;
+
+        /*
+         * filemap_fdata{sync,wait} are also called at PW lock cancelation so
+         * we know that they can only find data to writeback here if we are
+         * still holding the PW lock that covered the dirty pages.  XXX we
+         * should probably get a reference on it, though, just to be clear.
+         */
+        ret = filemap_fdatasync(dentry->d_inode->i_mapping);
+        if ( ret == 0 )
+                ret = filemap_fdatawait(dentry->d_inode->i_mapping);
+
+        RETURN(ret);
 }
 
 int ll_inode_revalidate(struct dentry *dentry)
@@ -848,10 +893,24 @@ int ll_inode_revalidate(struct dentry *dentry)
         if (!lsm)       /* object not yet allocated, don't validate size */
                 RETURN(0);
 
-        /* XXX this should probably become an unconditional obd_getattr()
-         *     so that we update the blocks count and mtime from the OST too.
+        /*
+         * unfortunately stat comes in through revalidate and we don't
+         * differentiate this use from initial instantiation.  we're
+         * also being wildly conservative and flushing write caches
+         * so that stat really returns the proper size.
          */
-        RETURN(ll_file_size(inode, lsm, NULL));
+        {
+                struct ldlm_extent extent = {0, OBD_OBJECT_EOF};
+                struct lustre_handle lockh = {0, 0};
+                ldlm_error_t err;
+
+                err = ll_extent_lock(NULL, inode, lsm, LCK_PR, &extent, &lockh);
+                if (err != ELDLM_OK && err != ELDLM_LOCK_MATCHED )
+                        RETURN(-abs(err)); /* XXX can't be right */
+
+                ll_extent_unlock(NULL, inode, lsm, LCK_PR, &lockh);
+        }
+        RETURN(0);
 }
 
 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
@@ -889,7 +948,7 @@ struct file_operations ll_file_operations = {
         release:        ll_file_release,
         mmap:           generic_file_mmap,
         llseek:         ll_file_seek,
-        fsync:          NULL
+        fsync:          ll_fsync,
 };
 
 struct inode_operations ll_file_inode_operations = {
diff --git a/lustre/llite/iod.c b/lustre/llite/iod.c
new file mode 100644 (file)
index 0000000..3a045f4
--- /dev/null
@@ -0,0 +1,415 @@
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
+ *
+ *   This file is part of Lustre, http://www.lustre.org.
+ *
+ *   Lustre is free software; you can redistribute it and/or
+ *   modify it under the terms of version 2 of the GNU General Public
+ *   License as published by the Free Software Foundation.
+ *
+ *   Lustre is distributed in the hope that it will be useful,
+ *   but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *   GNU General Public License for more details.
+ *
+ *   You should have received a copy of the GNU General Public License
+ *   along with Lustre; if not, write to the Free Software
+ *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ *
+ *  Copyright (C) 2002, 2003  Cluster File Systems, Inc
+ *
+ *  this started as an implementation of an io daemon that woke regularly
+ *  to force writeback.. the throttling in prepare_write and kupdate's usual
+ *  writeback pressure got rid of our thread, but the file name remains.
+ */
+#include <linux/version.h>
+#include <linux/config.h>
+#include <linux/module.h>
+#include <linux/fs.h>
+#include <linux/stat.h>
+#include <linux/sched.h>
+#include <linux/smp_lock.h>
+#include <linux/kmod.h>
+#include <linux/pagemap.h>
+#include <linux/mm.h>
+
+/* PG_inactive_clean is shorthand for rmap, we want free_high/low here.. */
+#ifdef PG_inactive_clean
+#include <linux/mm_inline.h>
+#endif
+
+#define DEBUG_SUBSYSTEM S_LLITE
+#include <linux/lustre_lite.h>
+
+#ifndef list_for_each_prev_safe
+#define list_for_each_prev_safe(pos, n, head) \
+        for (pos = (head)->prev, n = pos->prev; pos != (head); \
+                pos = n, n = pos->prev )
+#endif
+
+extern spinlock_t inode_lock;
+
+#define LLWP_MAX_PAGES (PTL_MD_MAX_IOV)
+struct ll_writeback_pages {
+        unsigned        has_whole_pages:1,
+                        num_frags:2,
+                        num_pages:29;
+        struct brw_page pgs[LLWP_MAX_PAGES];
+};
+
+
+/*
+ * ugh, we want disk allocation on the target to happen in offset order.  we'll
+ * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
+ * fine for our small page arrays and doesn't require allocation.  its an
+ * insertion sort that swaps elements that are strides apart, shrinking the
+ * stride down until its '1' and the array is sorted.
+ */
+void sort_brw_pages(struct brw_page *array, int num)
+{
+        int stride, i, j;
+        struct brw_page tmp;
+
+        if ( num == 1 )
+                return;
+
+        for( stride = 1; stride < num ; stride = (stride*3) +1  )
+                ;
+
+        do {
+                stride /= 3;
+                for ( i = stride ; i < num ; i++ ) {
+                        tmp = array[i];
+                        j = i;
+                        while ( j >= stride &&
+                                        array[j - stride].off > tmp.off ) {
+                                array[j] = array[j - stride];
+                                j -= stride;
+                        }
+                        array[j] = tmp;
+                }
+        } while ( stride > 1 );
+}
+
+/*
+ * returns 0 if the page was inserted in the array because it was
+ * within i_size.  if we raced with truncate and i_size was less
+ * than the page we can unlock the page because truncate_inode_pages will
+ * be waiting to cleanup the page
+ */
+static int llwp_consume_page(struct ll_writeback_pages *llwp,
+                             struct inode *inode, struct page *page)
+{
+        obd_off off = ((obd_off)page->index) << PAGE_SHIFT;
+        struct brw_page *pg;
+
+        /* we raced with truncate? */
+        if ( off >= inode->i_size ) {
+                unlock_page(page);
+                goto out;
+        }
+
+        page_cache_get(page);
+        pg = &llwp->pgs[llwp->num_pages];
+        llwp->num_pages++;
+
+        pg->pg = page;
+        pg->off = off;
+        pg->flag = OBD_BRW_CREATE;
+        pg->count = PAGE_SIZE;
+
+        /* catch partial writes for files that end mid-page */
+        if ( pg->off + pg->count > inode->i_size )
+                pg->count = inode->i_size & ~PAGE_MASK;
+
+        if ( pg->count == PAGE_SIZE ) {
+                if ( ! llwp->has_whole_pages ) {
+                        llwp->has_whole_pages = 1;
+                        llwp->num_frags++;
+                }
+        } else {
+                llwp->num_frags++;
+        }
+
+        /*
+         * matches ptlrpc_bulk_get assert that trickles down
+         * from a 0 page length going through niobuf and into
+         * the buffer regions being posted
+         */
+        LASSERT(pg->count >= 0);
+
+        CDEBUG(D_CACHE, "brw_page %p: off "LPU64" cnt %d, page %p: ind %ld"
+                        " i_size: "LPU64"\n", pg, pg->off, pg->count, page, 
+                        page->index, inode->i_size);
+
+        if ( llwp->num_frags == 3 || llwp->num_pages == LLWP_MAX_PAGES )
+                return -1;
+
+out:
+        return 0;
+}
+
+/*
+ * returns the number of pages that it added to the pgs array
+ *
+ * this duplicates filemap_fdatasync and gives us an opportunity to grab lots
+ * of dirty pages..
+ */
+static void ll_get_dirty_pages(struct inode *inode,
+                               struct ll_writeback_pages *llwp)
+{
+        struct address_space *mapping = inode->i_mapping;
+        struct page *page;
+        struct list_head *pos, *n;
+        ENTRY;
+
+        spin_lock(&pagecache_lock);
+
+        list_for_each_prev_safe(pos, n, &mapping->dirty_pages) {
+                page = list_entry(pos, struct page, list);
+
+                if (TryLockPage(page))
+                        continue;
+
+                list_del(&page->list);
+                list_add(&page->list, &mapping->locked_pages);
+
+                if ( ! PageDirty(page) ) {
+                        unlock_page(page);
+                        continue;
+                }
+                ClearPageDirty(page);
+
+                if ( llwp_consume_page(llwp, inode, page) != 0)
+                        break;
+        }
+
+        spin_unlock(&pagecache_lock);
+        EXIT;
+}
+
+static void ll_brw_pages_unlock( struct inode *inode,
+                                 struct ll_writeback_pages *llwp)
+{
+        int rc, i;
+        struct obd_brw_set *set;
+        ENTRY;
+
+        sort_brw_pages(llwp->pgs, llwp->num_pages);
+
+        set = obd_brw_set_new();
+        if (set == NULL) {
+                EXIT;
+                return;
+        }
+        set->brw_callback = ll_brw_sync_wait;
+
+        rc = obd_brw(OBD_BRW_WRITE, ll_i2obdconn(inode),
+                     ll_i2info(inode)->lli_smd, llwp->num_pages, llwp->pgs,
+                     set, NULL);
+        if (rc) {
+                CERROR("error from obd_brw: rc = %d\n", rc);
+        } else {
+                rc = ll_brw_sync_wait(set, CB_PHASE_START);
+                if (rc)
+                        CERROR("error from callback: rc = %d\n", rc);
+        }
+        obd_brw_set_decref(set);
+
+        /* XXX this doesn't make sense to me */
+        rc = 0;
+
+        for ( i = 0 ; i < llwp->num_pages ; i++) {
+                struct page *page = llwp->pgs[i].pg;
+
+                CDEBUG(D_CACHE, "cleaning page %p\n", page);
+                LASSERT(PageLocked(page));
+                unlock_page(page);
+                page_cache_release(page);
+        }
+
+        EXIT;
+}
+
+#ifndef PG_inactive_clean
+#ifdef CONFIG_DISCONTIGMEM
+#error "sorry, we don't support DISCONTIGMEM yet"
+#endif
+/*
+ * __alloc_pages marks a zone as needing balancing if an allocation is
+ * performed when the zone has fewer free pages than its 'low' water
+ * mark.  its cleared when try_to_free_pages makes progress.
+ */
+static int zones_need_balancing(void)
+{
+        pg_data_t * pgdat;
+        zone_t *zone;
+        int i;
+
+        for ( pgdat = pgdat_list ; pgdat != NULL ; pgdat = pgdat->node_next ) {
+                for ( i = pgdat->nr_zones-1 ; i >= 0 ; i-- ) {
+                        zone = &pgdat->node_zones[i];
+
+                        if ( zone->need_balance )
+                                return 1;
+                }
+        }
+        return 0;
+}
+#endif
+/* 2.4 doesn't give us a way to find out how many pages we have
+ * cached 'cause we're not using buffer_heads.  we are very
+ * conservative here and flush the superblock of all dirty data
+ * when the vm (rmap or stock) thinks that it is running low
+ * and kswapd would have done work.  kupdated isn't good enough
+ * because writers (dbench) can dirty _very quickly_, and we
+ * allocate under writepage..
+ *
+ * 2.5 gets this right, see the {inc,dec}_page_state(nr_dirty, )
+ */
+static int should_writeback(void)
+{
+#ifdef PG_inactive_clean
+        if (free_high(ALL_ZONES) > 0 || free_low(ANY_ZONE) > 0)
+#else
+        if (zones_need_balancing())
+#endif
+                return 1;
+        return 0;
+}
+
+int ll_check_dirty( struct super_block *sb)
+{
+        unsigned long old_flags; /* hack? */
+        int making_progress;
+        struct ll_writeback_pages *llwp;
+        struct inode *inode;
+        int rc = 0;
+        ENTRY;
+
+        if ( ! should_writeback() )
+                return 0;
+
+        old_flags = current->flags;
+        current->flags |= PF_MEMALLOC;
+        llwp = kmalloc(sizeof(struct ll_writeback_pages), GFP_ATOMIC);
+        if ( llwp == NULL )
+                GOTO(cleanup, rc = -ENOMEM);
+        memset(llwp, 0, offsetof(struct ll_writeback_pages, pgs));
+
+        spin_lock(&inode_lock);
+
+        /*
+         * first we try and write back dirty pages from dirty inodes
+         * until the VM thinkgs we're ok again..
+         */
+        do {
+                struct list_head *pos;
+                inode = NULL;
+                making_progress = 0;
+
+                list_for_each_prev(pos, &sb->s_dirty) {
+                        inode = list_entry(pos, struct inode, i_list);
+
+                        if ( ! (inode->i_state & I_DIRTY_PAGES) ) {
+                                inode = NULL;
+                                continue;
+                        }
+                        break;
+                }
+
+                if ( inode == NULL )
+                        break;
+
+                /* duplicate __sync_one, *sigh* */
+                list_del(&inode->i_list);
+                list_add(&inode->i_list, &inode->i_sb->s_locked_inodes);
+                inode->i_state |= I_LOCK;
+                inode->i_state &= ~I_DIRTY_PAGES;
+
+                spin_unlock(&inode_lock);
+
+                do { 
+                        memset(llwp, 0, sizeof(*llwp));
+                        ll_get_dirty_pages(inode, llwp);
+                        if ( llwp->num_pages ) {
+                                ll_brw_pages_unlock(inode, llwp);
+                                rc += llwp->num_pages;
+                                making_progress = 1;
+                        }
+                } while (llwp->num_pages && should_writeback() );
+
+                spin_lock(&inode_lock);
+
+                if ( ! list_empty(&inode->i_mapping->dirty_pages) )
+                        inode->i_state |= I_DIRTY_PAGES;
+
+                inode->i_state &= ~I_LOCK;
+                /*
+                 * we are sneaky and leave the inode on the dirty list,
+                 * even though it might not still be..
+                 */
+                if (!(inode->i_state & I_FREEING)) {
+                        list_del(&inode->i_list);
+                        list_add(&inode->i_list, &inode->i_sb->s_dirty);
+                }
+                wake_up(&inode->i_wait);
+
+        } while ( making_progress && should_writeback() );
+
+        /*
+         * and if that didn't work, we sleep on any data that might
+         * be under writeback..
+         */
+        while ( should_writeback() ) {
+                if ( list_empty(&sb->s_locked_inodes) )  
+                        break;
+
+                inode = list_entry(sb->s_locked_inodes.next, struct inode, 
+                                i_list);
+
+                atomic_inc(&inode->i_count); /* XXX hack? */
+                spin_unlock(&inode_lock);
+                wait_event(inode->i_wait, !(inode->i_state & I_LOCK));
+                iput(inode);
+                spin_lock(&inode_lock);
+        }
+
+        spin_unlock(&inode_lock);
+
+cleanup:
+        if ( llwp != NULL )
+                kfree(llwp);
+        current->flags = old_flags;
+
+        RETURN(rc);
+}
+
+int ll_batch_writepage( struct inode *inode, struct page *page )
+{
+        unsigned long old_flags; /* hack? */
+        struct ll_writeback_pages *llwp;
+        int rc = 0;
+        ENTRY;
+
+        old_flags = current->flags;
+        current->flags |= PF_MEMALLOC;
+        llwp = kmalloc(sizeof(struct ll_writeback_pages), GFP_ATOMIC);
+        if ( llwp == NULL )
+                GOTO(cleanup, rc = -ENOMEM);
+        memset(llwp, 0, offsetof(struct ll_writeback_pages, pgs));
+
+        llwp_consume_page(llwp, inode, page);
+
+        ll_get_dirty_pages(inode, llwp);
+        if ( llwp->num_pages )
+                ll_brw_pages_unlock(inode, llwp);
+
+cleanup:
+        if ( llwp != NULL )
+                kfree(llwp);
+        current->flags = old_flags;
+        RETURN(rc);
+}
index 6818ace..409f308 100644 (file)
@@ -33,6 +33,7 @@
 #include <asm/system.h>
 #include <asm/uaccess.h>
 
+
 #include <linux/fs.h>
 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
 #include <linux/buffer_head.h>
@@ -70,6 +71,7 @@ static void __set_page_clean(struct page *page)
         list_del(&page->list);
         list_add(&page->list, &mapping->clean_pages);
 
+        /* XXX doesn't inode_lock protect i_state ? */
         inode = mapping->host;
         if (list_empty(&mapping->dirty_pages)) {
                 CDEBUG(D_INODE, "inode clean\n");
@@ -81,7 +83,7 @@ static void __set_page_clean(struct page *page)
         EXIT;
 }
 
-inline void set_page_clean(struct page *page)
+void set_page_clean(struct page *page)
 {
         if (PageDirty(page)) {
                 ClearPageDirty(page);
@@ -90,7 +92,7 @@ inline void set_page_clean(struct page *page)
 }
 
 /* SYNCHRONOUS I/O to object storage for an inode */
-static int ll_brw(int cmd, struct inode *inode, struct page *page, int create)
+static int ll_brw(int cmd, struct inode *inode, struct page *page, int flags)
 {
         struct ll_inode_info *lli = ll_i2info(inode);
         struct lov_stripe_md *lsm = lli->lli_smd;
@@ -112,8 +114,8 @@ static int ll_brw(int cmd, struct inode *inode, struct page *page, int create)
                 pg.count = PAGE_SIZE;
 
         CDEBUG(D_PAGE, "%s %d bytes ino %lu at "LPU64"/"LPX64"\n",
-              cmd & OBD_BRW_WRITE ? "write" : "read", pg.count, inode->i_ino,
-              pg.off, pg.off);
+               cmd & OBD_BRW_WRITE ? "write" : "read", pg.count, inode->i_ino,
+               pg.off, pg.off);
         if (pg.count == 0) {
                 CERROR("ZERO COUNT: ino %lu: size %p:%Lu(%p:%Lu) idx %lu off "
                        LPU64"\n",
@@ -121,7 +123,7 @@ static int ll_brw(int cmd, struct inode *inode, struct page *page, int create)
                        page->mapping->host->i_size, page->index, pg.off);
         }
 
-        pg.flag = create ? OBD_BRW_CREATE : 0;
+        pg.flag = flags;
 
         set->brw_callback = ll_brw_sync_wait;
         rc = obd_brw(cmd, ll_i2obdconn(inode), lsm, 1, &pg, set, NULL);
@@ -133,56 +135,174 @@ static int ll_brw(int cmd, struct inode *inode, struct page *page, int create)
                 if (rc)
                         CERROR("error from callback: rc = %d\n", rc);
         }
-        obd_brw_set_free(set);
+        obd_brw_set_decref(set);
 
         RETURN(rc);
 }
 
-/* returns the page unlocked, but with a reference */
-static int ll_readpage(struct file *file, struct page *page)
+/* 
+ * we were asked to read a single page but we're going to try and read a batch
+ * of pages all at once.  this vaguely simulates 2.5's readpages.
+ */
+static int ll_readpage(struct file *file, struct page *first_page)
 {
-        struct inode *inode = page->mapping->host;
-        obd_off offset = ((obd_off)page->index) << PAGE_SHIFT;
-        int rc = 0;
+        struct inode *inode = first_page->mapping->host;
+        struct ll_inode_info *lli = ll_i2info(inode);
+        struct page *page = first_page;
+        struct list_head *pos;
+        struct brw_page *pgs;
+        struct obd_brw_set *set;
+        unsigned long end_index, extent_end = 0;
+        int npgs = 0, rc = 0;
         ENTRY;
 
-        if (!PageLocked(page))
-                LBUG();
+        LASSERT(PageLocked(page));
+        LASSERT(!PageUptodate(page));
+        CDEBUG(D_VFSTRACE, "VFS Op\n");
 
-        if (inode->i_size <= offset) {
+        if (inode->i_size <= ((obd_off)page->index) << PAGE_SHIFT) {
                 CERROR("reading beyond EOF\n");
                 memset(kmap(page), 0, PAGE_SIZE);
                 kunmap(page);
-                GOTO(readpage_out, rc);
+                SetPageUptodate(page);
+                unlock_page(page);
+                RETURN(rc);
         }
 
-        /* XXX Workaround for BA OSTs returning short reads at EOF.  The linux
-         *     OST will return the full page, zero-filled at the end, which
-         *     will just overwrite the data we set here.
-         *     Bug 593 relates to fixing this properly.
+        pgs = kmalloc(PTL_MD_MAX_IOV * sizeof(*pgs), GFP_USER);
+        if ( pgs == NULL )
+                RETURN(-ENOMEM);
+        set = obd_brw_set_new();
+        if ( set == NULL )
+                GOTO(out_pgs, rc = -ENOMEM);
+
+        /* arbitrarily try to read-ahead 8 times what we can pass on 
+         * the wire at once, clamped to file size */
+        end_index = first_page->index + 
+                8 * ((PTL_MD_MAX_IOV * PAGE_SIZE)>>PAGE_CACHE_SHIFT);
+        if ( end_index > inode->i_size >> PAGE_CACHE_SHIFT )
+                end_index = inode->i_size >> PAGE_CACHE_SHIFT;
+
+        /*
+         * find how far we're allowed to read under the extent ll_file_read
+         * is passing us.. 
          */
-        if (inode->i_size < offset + PAGE_SIZE) {
-                int count = inode->i_size - offset;
-                void *addr = kmap(page);
-                //POISON(addr, 0x7c, count);
-                memset(addr + count, 0, PAGE_SIZE - count);
-                kunmap(page);
+        spin_lock(&lli->lli_read_extent_lock);
+        list_for_each(pos, &lli->lli_read_extents) {
+                struct ll_read_extent *rextent;
+                rextent = list_entry(pos, struct ll_read_extent, re_lli_item);
+                if ( rextent->re_task != current )
+                        continue;
+
+                if (rextent->re_extent.end + PAGE_SIZE < rextent->re_extent.end)
+                        /* extent wrapping */
+                        extent_end = ~0;
+                else  {
+                        extent_end = ( rextent->re_extent.end + PAGE_SIZE )
+                                                        << PAGE_CACHE_SHIFT;
+                        /* 32bit indexes, 64bit extents.. */
+                        if ( ((u64)extent_end >> PAGE_CACHE_SHIFT ) < 
+                                        rextent->re_extent.end )
+                                extent_end = ~0;
+                }
+                break;
         }
+        spin_unlock(&lli->lli_read_extent_lock);
+
+        if ( extent_end == 0 ) {
+                CERROR("readpage outside ll_file_read, no lock held?\n");
+                end_index = page->index + 1;
+        } else if ( extent_end < end_index )
+                end_index = extent_end;
+
+        /* to balance the find_get_page ref the other pages get that is
+         * decrefed on teardown.. */
+        page_cache_get(page);
+        do { 
+                unsigned long index ;
+
+                pgs[npgs].pg = page;
+                pgs[npgs].off = ((obd_off)page->index) << PAGE_CACHE_SHIFT;
+                pgs[npgs].flag = 0;
+                pgs[npgs].count = PAGE_SIZE;
+                /* XXX Workaround for BA OSTs returning short reads at EOF.
+                 * The linux OST will return the full page, zero-filled at the
+                 * end, which will just overwrite the data we set here.  Bug
+                 * 593 relates to fixing this properly.
+                 */
+                if (inode->i_size < pgs[npgs].off + PAGE_SIZE) {
+                        int count = inode->i_size - pgs[npgs].off;
+                        void *addr = kmap(page);
+                        pgs[npgs].count = count;
+                        //POISON(addr, 0x7c, count);
+                        memset(addr + count, 0, PAGE_SIZE - count);
+                        kunmap(page);
+                }
+
+                npgs++;
+                if ( npgs == PTL_MD_MAX_IOV )
+                        break;
+
+                /*
+                 * find pages ahead of us that we can read in.  
+                 * grab_cache_page waits on pages that are locked so
+                 * we first try find_get_page, which doesn't.  this stops
+                 * the worst case behaviour of racing threads waiting on 
+                 * each other, but doesn't remove it entirely.
+                 */
+                for ( index = page->index + 1, page = NULL ;
+                        page == NULL && index < end_index ; index++ ) {
+
+                        /* see if the page already exists and needs updating */
+                        page = find_get_page(inode->i_mapping, index);
+                        if ( page ) {
+                                if ( Page_Uptodate(page) || TryLockPage(page) )
+                                        goto out_release;
+                                if ( !page->mapping || Page_Uptodate(page)) 
+                                        goto out_unlock;
+                        } else {
+                                /* ok, we have to create it.. */
+                                page = grab_cache_page(inode->i_mapping, index);
+                                if ( page == NULL ) 
+                                        continue;
+                                if ( Page_Uptodate(page) )
+                                        goto out_unlock;
+                        }
+
+                        break;
+
+                out_unlock:
+                        unlock_page(page);
+                out_release:
+                        page_cache_release(page);
+                        page = NULL;
+                }
 
-        if (PageUptodate(page)) {
-                CERROR("Explain this please?\n");
-                GOTO(readpage_out, rc);
+        } while (page);
+
+        set->brw_callback = ll_brw_sync_wait;
+        rc = obd_brw(OBD_BRW_READ, ll_i2obdconn(inode),
+                     ll_i2info(inode)->lli_smd, npgs, pgs, set, NULL);
+        if (rc) {
+                CERROR("error from obd_brw: rc = %d\n", rc);
+        } else {
+                rc = ll_brw_sync_wait(set, CB_PHASE_START);
+                if (rc)
+                        CERROR("error from callback: rc = %d\n", rc);
         }
+        obd_brw_set_decref(set);
 
-        CDEBUG(D_VFSTRACE, "VFS Op\n");
-        rc = ll_brw(OBD_BRW_READ, inode, page, 0);
-        EXIT;
+        while ( --npgs > -1 ) {
+                page = pgs[npgs].pg;
 
- readpage_out:
-        if (!rc)
-                SetPageUptodate(page);
-        unlock_page(page);
-        return 0;
+                if ( rc == 0 )
+                        SetPageUptodate(page);
+                unlock_page(page);
+                page_cache_release(page);
+        }
+out_pgs:
+        kfree(pgs);
+        RETURN(rc);
 } /* ll_readpage */
 
 void ll_truncate(struct inode *inode)
@@ -190,12 +310,14 @@ void ll_truncate(struct inode *inode)
         struct obdo oa = {0};
         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
         struct lustre_handle lockh = { 0, 0 };
+        struct ldlm_extent extent = {inode->i_size, OBD_OBJECT_EOF};
         int err;
         ENTRY;
 
         if (!lsm) {
                 /* object not yet allocated */
                 inode->i_mtime = inode->i_ctime = CURRENT_TIME;
+                EXIT;
                 return;
         }
 
@@ -207,9 +329,11 @@ void ll_truncate(struct inode *inode)
         CDEBUG(D_INFO, "calling punch for "LPX64" (all bytes after %Lu)\n",
                oa.o_id, inode->i_size);
 
-        err = ll_size_lock(inode, lsm, inode->i_size, LCK_PW, &lockh);
-        if (err) {
-                CERROR("ll_size_lock failed: %d\n", err);
+         /* i_size has already been set to the new size */
+        err = ll_extent_lock_no_validate(NULL, inode, lsm, LCK_PW, 
+                                        &extent, &lockh);
+        if (err != ELDLM_OK && err != ELDLM_LOCK_MATCHED) {
+                EXIT;
                 return;
         }
 
@@ -221,9 +345,9 @@ void ll_truncate(struct inode *inode)
         else
                 obdo_to_inode(inode, &oa, oa.o_valid);
 
-        err = ll_size_unlock(inode, lsm, LCK_PW, &lockh);
+        err = ll_extent_unlock(NULL, inode, lsm, LCK_PW, &lockh);
         if (err)
-                CERROR("ll_size_unlock failed: %d\n", err);
+                CERROR("ll_extent_unlock failed: %d\n", err);
 
         EXIT;
         return;
@@ -237,11 +361,12 @@ static int ll_prepare_write(struct file *file, struct page *page, unsigned from,
         struct inode *inode = page->mapping->host;
         obd_off offset = ((obd_off)page->index) << PAGE_SHIFT;
         int rc = 0;
-        char *addr;
         ENTRY;
 
-        addr = kmap(page);
-        LASSERT(PageLocked(page));
+        ll_check_dirty(inode->i_sb);
+
+        if (!PageLocked(page))
+                LBUG();
 
         if (PageUptodate(page))
                 RETURN(0);
@@ -254,118 +379,83 @@ static int ll_prepare_write(struct file *file, struct page *page, unsigned from,
                 RETURN(0);
         CDEBUG(D_VFSTRACE, "VFS Op\n");
 
-        /* If are writing to a new page, no need to read old data.  If we
-         * haven't already gotten the file size in ll_file_write() since
-         * we got our extent lock, we need to verify it here before we
-         * overwrite some other node's write (bug 445).
-         */
+        /* If are writing to a new page, no need to read old data.
+         * the extent locking and getattr procedures in ll_file_write have
+         * guaranteed that i_size is stable enough for our zeroing needs */
         if (inode->i_size <= offset) {
-                if (!S_ISBLK(inode->i_mode) && !(file->f_flags & O_APPEND)) {
-                        struct ll_file_data *fd = file->private_data;
-                        struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
-
-                        rc = ll_file_size(inode, lsm, fd->fd_ostdata);
-                        if (rc)
-                                GOTO(prepare_done, rc);
-                }
-                if (inode->i_size <= offset) {
-                        memset(addr, 0, PAGE_SIZE);
-                        GOTO(prepare_done, rc=0);
-                }
+                memset(kmap(page), 0, PAGE_SIZE);
+                kunmap(page);
+                GOTO(prepare_done, rc = 0);
         }
 
         rc = ll_brw(OBD_BRW_READ, inode, page, 0);
 
         EXIT;
  prepare_done:
-        if (!rc)
+        if (rc == 0)
                 SetPageUptodate(page);
-        else
-                kunmap (page);
 
         return rc;
 }
 
-/* Write a page from kupdated or kswapd.
+/*
+ * background file writeback.  This is called regularly from kupdated to write
+ * dirty data, from kswapd when memory is low, and from filemap_fdatasync when
+ * super blocks or inodes are synced..
  *
- * We unlock the page even in the face of an error, otherwise dirty
- * pages could OOM the system if they cannot be written.  Also, there
- * is nobody to return an error code to from here - the application
- * may not even be running anymore.
+ * obd_brw errors down in _batch_writepage are ignored, so pages are always
+ * unlocked.  Also, there is nobody to return an error code to from here - the
+ * application may not even be running anymore.
  *
- * Returns the page unlocked, but with a reference.
+ * this should be async so that things like kswapd can have a chance to
+ * free some more pages that our allocating writeback may need, but it isn't
+ * yet.
  */
-static int ll_writepage(struct page *page) {
+static int ll_writepage(struct page *page)
+{
         struct inode *inode = page->mapping->host;
-        int err;
         ENTRY;
 
-        LASSERT(PageLocked(page));
-
-        /* XXX need to make sure we have LDLM lock on this page */
+        CDEBUG(D_CACHE, "page %p [lau %d] inode %p\n", page,
+                        PageLaunder(page), inode);
         CDEBUG(D_VFSTRACE, "VFS Op\n");
-        err = ll_brw(OBD_BRW_WRITE, inode, page, 1);
-        if (err)
-                CERROR("ll_brw failure %d\n", err);
-        else
-                set_page_clean(page);
+        LASSERT(PageLocked(page));
 
-        unlock_page(page);
-        RETURN(err);
+        /* XXX should obd_brw errors trickle up? */
+        ll_batch_writepage(inode, page);
+        RETURN(0);
 }
 
-
-/* SYNCHRONOUS I/O to object storage for an inode -- object attr will be updated
- * too */
+/*
+ * we really don't want to start writeback here, we want to give callers some
+ * time to further dirty the pages before we write them out.
+ */
 static int ll_commit_write(struct file *file, struct page *page,
                            unsigned from, unsigned to)
 {
         struct inode *inode = page->mapping->host;
-        struct ll_inode_info *lli = ll_i2info(inode);
-        struct lov_stripe_md *md = lli->lli_smd;
-        struct brw_page pg;
-        struct obd_brw_set *set;
-        int rc, create = 1;
         loff_t size;
         ENTRY;
 
-        pg.pg = page;
-        pg.count = to;
-        /* XXX make the starting offset "from" */
-        pg.off = (((obd_off)page->index) << PAGE_SHIFT);
-        pg.flag = create ? OBD_BRW_CREATE : 0;
-
-        set = obd_brw_set_new();
-        if (set == NULL)
-                RETURN(-ENOMEM);
-
-        SetPageUptodate(page);
-
-        if (!PageLocked(page))
-                LBUG();
+        LASSERT(inode == file->f_dentry->d_inode);
+        LASSERT(PageLocked(page));
 
         CDEBUG(D_VFSTRACE, "VFS Op\n");
-        CDEBUG(D_INODE, "commit_page writing (off "LPD64"), count %d\n",
-               pg.off, pg.count);
+        CDEBUG(D_INODE, "inode %p is writing page %p from %d to %d at %lu\n",
+               inode, page, from, to, page->index);
 
-        set->brw_callback = ll_brw_sync_wait;
-        rc = obd_brw(OBD_BRW_WRITE, ll_i2obdconn(inode), md, 1, &pg, set, NULL);
-        if (rc)
-                CERROR("error from obd_brw: rc = %d\n", rc);
-        else {
-                rc = ll_brw_sync_wait(set, CB_PHASE_START);
-                if (rc)
-                        CERROR("error from callback: rc = %d\n", rc);
-        }
-        obd_brw_set_free(set);
-        kunmap(page);
+        /* to match full page case in prepare_write */
+        SetPageUptodate(page);
+        /* mark the page dirty, put it on mapping->dirty,
+         * mark the inode PAGES_DIRTY, put it on sb->dirty */
+        set_page_dirty(page);
 
-        size = pg.off + pg.count;
-        /* do NOT truncate when writing in the middle of a file */
+        /* this is matched by a hack in obdo_to_inode at the moment */
+        size = (((obd_off)page->index) << PAGE_SHIFT) + to;
         if (size > inode->i_size)
                 inode->i_size = size;
 
-        RETURN(rc);
+        RETURN(0);
 } /* ll_commit_write */
 
 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
@@ -384,11 +474,17 @@ static int ll_direct_IO(int rw, struct inode *inode, struct kiobuf *iobuf,
         if (!lsm || !lsm->lsm_object_id)
                 RETURN(-ENOMEM);
 
+        if ((iobuf->offset & (blocksize - 1)) ||
+            (iobuf->length & (blocksize - 1)))
+                RETURN(-EINVAL);
+
+#if 0
         /* XXX Keep here until we find ia64 problem, it crashes otherwise */
         if (blocksize != PAGE_SIZE) {
                 CERROR("direct_IO blocksize != PAGE_SIZE\n");
                 RETURN(-EINVAL);
         }
+#endif
 
         set = obd_brw_set_new();
         if (set == NULL)
@@ -396,17 +492,12 @@ static int ll_direct_IO(int rw, struct inode *inode, struct kiobuf *iobuf,
 
         OBD_ALLOC(pga, sizeof(*pga) * iobuf->nr_pages);
         if (!pga) {
-                obd_brw_set_free(set);
+                obd_brw_set_decref(set);
                 RETURN(-ENOMEM);
         }
 
-        CDEBUG(D_PAGE, "blocksize %u, blocknr %lu, iobuf %p: nr_pages %u, "
-                       "array_len %u, offset %u, length %u\n",
-               blocksize, blocknr, iobuf, iobuf->nr_pages,
-               iobuf->array_len, iobuf->offset, iobuf->length);
-
         flags = (rw == WRITE ? OBD_BRW_CREATE : 0) /* | OBD_BRW_DIRECTIO */;
-        offset = (blocknr << inode->i_blkbits) /* + iobuf->offset? */;
+        offset = (blocknr << inode->i_blkbits);
         length = iobuf->length;
 
         for (i = 0, length = iobuf->length; length > 0;
@@ -417,8 +508,6 @@ static int ll_direct_IO(int rw, struct inode *inode, struct kiobuf *iobuf,
                 pga[i].count = min_t(int, PAGE_SIZE - (offset & ~PAGE_MASK),
                                      length);
                 pga[i].flag = flags;
-                CDEBUG(D_PAGE, "page %d (%p), offset "LPU64", count %u\n",
-                       i, pga[i].pg, pga[i].off, pga[i].count);
                 if (rw == READ) {
                         //POISON(kmap(iobuf->maplist[i]), 0xc5, PAGE_SIZE);
                         //kunmap(iobuf->maplist[i]);
@@ -436,7 +525,7 @@ static int ll_direct_IO(int rw, struct inode *inode, struct kiobuf *iobuf,
                 if (rc)
                         CERROR("error from callback: rc = %d\n", rc);
         }
-        obd_brw_set_free(set);
+        obd_brw_set_decref(set);
         if (rc == 0)
                 rc = iobuf->length;
 
@@ -445,52 +534,8 @@ static int ll_direct_IO(int rw, struct inode *inode, struct kiobuf *iobuf,
 }
 #endif
 
-int ll_flush_inode_pages(struct inode * inode)
-{
-        obd_count        bufs_per_obdo = 0;
-        obd_size         *count = NULL;
-        obd_off          *offset = NULL;
-        obd_flag         *flags = NULL;
-        int              err = 0;
-
-        ENTRY;
-
-#if (LINUX_VERSION_CODE <= KERNEL_VERSION(2,5,0))
-        spin_lock(&pagecache_lock);
-
-        spin_unlock(&pagecache_lock);
-#endif
-
-
-        OBD_ALLOC(count, sizeof(*count) * bufs_per_obdo);
-        OBD_ALLOC(offset, sizeof(*offset) * bufs_per_obdo);
-        OBD_ALLOC(flags, sizeof(*flags) * bufs_per_obdo);
-        if (!count || !offset || !flags)
-                GOTO(out, err=-ENOMEM);
-
-#if 0
-        for (i = 0 ; i < bufs_per_obdo ; i++) {
-                count[i] = PAGE_SIZE;
-                offset[i] = ((obd_off)(iobuf->maplist[i])->index) << PAGE_SHIFT;
-                flags[i] = OBD_BRW_CREATE;
-        }
-
-        err = obd_brw(OBD_BRW_WRITE, ll_i2obdconn(inode),
-                      ll_i2info(inode)->lli_smd, bufs_per_obdo,
-                      iobuf->maplist, count, offset, flags, NULL, NULL);
-        if (err == 0)
-                err = bufs_per_obdo * 4096;
-#endif
- out:
-        OBD_FREE(flags, sizeof(*flags) * bufs_per_obdo);
-        OBD_FREE(count, sizeof(*count) * bufs_per_obdo);
-        OBD_FREE(offset, sizeof(*offset) * bufs_per_obdo);
-        RETURN(err);
-}
-
 //#endif
 
-
 struct address_space_operations ll_aops = {
         readpage: ll_readpage,
 #if (LINUX_VERSION_CODE <= KERNEL_VERSION(2,5,0))
index 613c42f..ff754a0 100644 (file)
@@ -57,14 +57,14 @@ static char *ll_read_opt(const char *opt, char *data)
         ENTRY;
 
         CDEBUG(D_SUPER, "option: %s, data %s\n", opt, data);
-        if ( strncmp(opt, data, strlen(opt)) )
+        if (strncmp(opt, data, strlen(opt)))
                 RETURN(NULL);
-        if ( (value = strchr(data, '=')) == NULL )
+        if ((value = strchr(data, '=')) == NULL)
                 RETURN(NULL);
 
         value++;
         OBD_ALLOC(retval, strlen(value) + 1);
-        if ( !retval ) {
+        if (!retval) {
                 CERROR("out of memory!\n");
                 RETURN(NULL);
         }
@@ -79,7 +79,7 @@ static int ll_set_opt(const char *opt, char *data, int fl)
         ENTRY;
 
         CDEBUG(D_SUPER, "option: %s, data %s\n", opt, data);
-        if ( strncmp(opt, data, strlen(opt)) )
+        if (strncmp(opt, data, strlen(opt)))
                 RETURN(0);
         else
                 RETURN(fl);
@@ -99,10 +99,11 @@ static void ll_options(char *options, char **ost, char **mds, int *flags)
              this_char != NULL;
              this_char = strtok (NULL, ",")) {
                 CDEBUG(D_SUPER, "this_char %s\n", this_char);
-                if ( (!*ost && (*ost = ll_read_opt("osc", this_char)))||
-                     (!*mds && (*mds = ll_read_opt("mdc", this_char)))||
-                     (!(*flags & LL_SBI_NOLCK) && ((*flags) = (*flags) |
-                      ll_set_opt("nolock", this_char, LL_SBI_NOLCK))) )
+                if ((!*ost && (*ost = ll_read_opt("osc", this_char)))||
+                    (!*mds && (*mds = ll_read_opt("mdc", this_char)))||
+                    (!(*flags & LL_SBI_NOLCK) &&
+                     ((*flags) = (*flags) |
+                      ll_set_opt("nolock", this_char, LL_SBI_NOLCK))))
                         continue;
         }
         EXIT;
@@ -466,6 +467,20 @@ int ll_setattr_raw(struct inode *inode, struct iattr *attr)
         ENTRY;
 
         if ((attr->ia_valid & ATTR_SIZE)) {
+                /* writeback uses inode->i_size to determine how far out
+                 * its cached pages go.  ll_truncate gets a PW lock, canceling
+                 * our lock, _after_ it has updated i_size.  this can confuse
+                 * us into zero extending the file to the newly truncated
+                 * size, and this has bad implications for a racing o_append.
+                 * if we're extending our size we need to flush the pages
+                 * with the correct i_size before vmtruncate stomps on
+                 * the new i_size.  again, this can only find pages to
+                 * purge if the PW lock that generated them is still held.
+                 */
+                if ( attr->ia_size > inode->i_size ) {
+                        filemap_fdatasync(inode->i_mapping);
+                        filemap_fdatawait(inode->i_mapping);
+                }
                 err = vmtruncate(inode, attr->ia_size);
                 if (err)
                         RETURN(err);
@@ -613,21 +628,32 @@ static void ll_read_inode2(struct inode *inode, void *opaque)
 
         CDEBUG(D_VFSTRACE, "VFS Op\n");
         sema_init(&lli->lli_open_sem, 1);
+        atomic_set(&lli->lli_open_count, 0);
+        lli->lli_flags = 0;
+        init_MUTEX(&lli->lli_getattr_sem);
+        spin_lock_init(&lli->lli_read_extent_lock);
+        INIT_LIST_HEAD(&lli->lli_read_extents);
 
         LASSERT(!lli->lli_smd);
 
-        /* core attributes first */
+        /* core attributes from the MDS first */
         ll_update_inode(inode, body, lic ? lic->lic_lmm : NULL);
 
         /* Get the authoritative file size */
         if (lli->lli_smd && (inode->i_mode & S_IFREG)) {
-                int rc;
+                struct ldlm_extent extent = {0, OBD_OBJECT_EOF};
+                struct lustre_handle lockh = {0, 0};
+                struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
+                ldlm_error_t rc;
+
                 LASSERT(lli->lli_smd->lsm_object_id != 0);
-                rc = ll_file_size(inode, lli->lli_smd, NULL);
-                if (rc) {
-                        CERROR("ll_file_size: %d\n", rc);
+
+                rc = ll_extent_lock(NULL, inode, lsm, LCK_PR, &extent, &lockh);
+                if (rc != ELDLM_OK && rc != ELDLM_LOCK_MATCHED) {
                         ll_clear_inode(inode);
                         make_bad_inode(inode);
+                } else {
+                        ll_extent_unlock(NULL, inode, lsm, LCK_PR, &lockh);
                 }
         }
 
index f77fdea..f296d10 100644 (file)
@@ -63,14 +63,14 @@ static char *ll_read_opt(const char *opt, char *data)
         ENTRY;
 
         CDEBUG(D_SUPER, "option: %s, data %s\n", opt, data);
-        if ( strncmp(opt, data, strlen(opt)) )
+        if (strncmp(opt, data, strlen(opt)))
                 RETURN(NULL);
-        if ( (value = strchr(data, '=')) == NULL )
+        if ((value = strchr(data, '=')) == NULL)
                 RETURN(NULL);
 
         value++;
         OBD_ALLOC(retval, strlen(value) + 1);
-        if ( !retval ) {
+        if (!retval) {
                 CERROR("out of memory!\n");
                 RETURN(NULL);
         }
@@ -85,7 +85,7 @@ static int ll_set_opt(const char *opt, char *data, int fl)
         ENTRY;
 
         CDEBUG(D_SUPER, "option: %s, data %s\n", opt, data);
-        if ( strncmp(opt, data, strlen(opt)) )
+        if (strncmp(opt, data, strlen(opt)))
                 RETURN(0);
         else
                 RETURN(fl);
@@ -104,10 +104,11 @@ static void ll_options(char *options, char **ost, char **mds, int *flags)
 
         while ((this_char = strsep (&opt_ptr, ",")) != NULL) {
                 CDEBUG(D_SUPER, "this_char %s\n", this_char);
-                if ( (!*ost && (*ost = ll_read_opt("osc", this_char)))||
-                     (!*mds && (*mds = ll_read_opt("mdc", this_char)))||
-                     (!(*flags & LL_SBI_NOLCK) && ((*flags) = (*flags) |
-                      ll_set_opt("nolock", this_char, LL_SBI_NOLCK))) )
+                if ((!*ost && (*ost = ll_read_opt("osc", this_char)))||
+                    (!*mds && (*mds = ll_read_opt("mdc", this_char)))||
+                    (!(*flags & LL_SBI_NOLCK) &&
+                     ((*flags) = (*flags) |
+                      ll_set_opt("nolock", this_char, LL_SBI_NOLCK))))
                         continue;
         }
         EXIT;
@@ -572,6 +573,11 @@ int ll_read_inode2(struct inode *inode, void *opaque)
         ENTRY;
 
         sema_init(&lli->lli_open_sem, 1);
+        lli->flags = 0;
+        init_MUTEX(&lli->lli_getattr_sem);
+        /* these are 2.4 only, but putting them here for consistency.. */
+        spin_lock_init(&lli->lli_read_extent_lock);
+        INIT_LIST_HEAD(&lli->lli_read_extents);
 
         LASSERT(!lli->lli_smd);
 
@@ -580,12 +586,19 @@ int ll_read_inode2(struct inode *inode, void *opaque)
 
         /* Get the authoritative file size */
         if (lli->lli_smd && S_ISREG(inode->i_mode)) {
-                rc = ll_file_size(inode, lli->lli_smd, NULL);
-                if (rc) {
-                        CERROR("ll_file_size: %d\n", rc);
+                struct ll_file_data *fd = file->private_data;
+                struct ldlm_extent extent = {0, OBD_OBJECT_EOF};
+                struct lustre_handle lockh = {0, 0};
+
+                LASSERT(lli->lli_smd->lsm_object_id != 0);
+
+                rc = ll_extent_lock(fd, inode, lsm, LCK_PR, &extent, &lockh);
+                if (err != ELDLM_OK && err != ELDLM_MATCHED) {
                         ll_clear_inode(inode);
                         make_bad_inode(inode);
-                        RETURN(rc);
+                } else {
+                        l_extent_unlock(fd, inode, lsm, LCK_PR, &extent,
+                                        &lockh);
                 }
         }
 
@@ -661,6 +674,7 @@ static struct inode *ll_alloc_inode(struct super_block *sb)
 
         memset(lli, 0, (char *)&lli->lli_vfs_inode - (char *)lli);
         sema_init(&lli->lli_open_sem, 1);
+        init_MUTEX(&lli->lli_size_valid_sem);
 
         return &lli->lli_vfs_inode;
 }
index 0e7ad82..19738b9 100644 (file)
@@ -1267,7 +1267,8 @@ static int lov_enqueue(struct lustre_handle *conn, struct lov_stripe_md *lsm,
         struct lov_obd *lov;
         struct lov_oinfo *loi;
         struct lov_stripe_md submd;
-        int rc = 0, i;
+        ldlm_error_t rc = ELDLM_LOCK_MATCHED, err;
+        int i;
         ENTRY;
 
         if (!lsm) {
@@ -1322,20 +1323,27 @@ static int lov_enqueue(struct lustre_handle *conn, struct lov_stripe_md *lsm,
                 submd.lsm_stripe_count = 0;
                 /* XXX submd is not fully initialized here */
                 *flags = 0;
-                rc = obd_enqueue(&(lov->tgts[loi->loi_ost_idx].conn), &submd,
-                                 parent_lock, type, &sub_ext, sizeof(sub_ext),
-                                 mode, flags, cb, data, datalen, lov_lockhp);
+                err = obd_enqueue(&(lov->tgts[loi->loi_ost_idx].conn), &submd,
+                                  parent_lock, type, &sub_ext, sizeof(sub_ext),
+                                  mode, flags, cb, data, datalen, lov_lockhp);
+
                 // XXX add a lock debug statement here
-                if (rc)
+                /* return _MATCHED only when all locks matched.. */
+                if (err == ELDLM_OK) {
+                        rc = ELDLM_OK;
+                } else if (err != ELDLM_LOCK_MATCHED) {
+                        rc = err;
                         memset(lov_lockhp, 0, sizeof(*lov_lockhp));
-                if (rc && lov->tgts[loi->loi_ost_idx].active) {
-                        CERROR("error: enqueue objid "LPX64" subobj "LPX64
-                               " on OST idx %d: rc = %d\n", lsm->lsm_object_id,
-                               loi->loi_id, loi->loi_ost_idx, rc);
-                        goto out_locks;
+                        if (lov->tgts[loi->loi_ost_idx].active) {
+                                CERROR("error: enqueue objid "LPX64" subobj "
+                                       LPX64" on OST idx %d: rc = %d\n",
+                                       lsm->lsm_object_id, loi->loi_id,
+                                       loi->loi_ost_idx, rc);
+                                goto out_locks;
+                        }
                 }
         }
-        RETURN(0);
+        RETURN(rc);
 
 out_locks:
         while (loi--, lov_lockhp--, i-- > 0) {
@@ -1408,7 +1416,7 @@ static int lov_cancel(struct lustre_handle *conn, struct lov_stripe_md *lsm,
 
         lov = &export->exp_obd->u.lov;
         for (i = 0, loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count;
-             i++, loi++, lov_lockhp++ ) {
+             i++, loi++, lov_lockhp++) {
                 struct lov_stripe_md submd;
                 int err;
 
index 39e8592..7952101 100644 (file)
@@ -170,6 +170,9 @@ static int mds_read_last_rcvd(struct obd_device *obddev, struct file *f)
         __u64 last_transno = 0;
         __u64 last_mount;
         int rc = 0;
+        LASSERT(sizeof(struct mds_client_data) == MDS_LR_SIZE);
+        LASSERT(sizeof(struct mds_server_data) <= MDS_LR_CLIENT);
 
         OBD_ALLOC(msd, sizeof(*msd));
         if (!msd)
index 6200acd..6209d75 100644 (file)
@@ -86,12 +86,11 @@ static int obd_class_open(struct inode * inode, struct file * file)
         struct obd_class_user_state *ocus;
         ENTRY;
 
-        OBD_ALLOC (ocus, sizeof (*ocus));
+        OBD_ALLOC(ocus, sizeof(*ocus));
         if (ocus == NULL)
                 return (-ENOMEM);
 
-        INIT_LIST_HEAD (&ocus->ocus_conns);
-        ocus->ocus_current_obd = NULL;
+        INIT_LIST_HEAD(&ocus->ocus_conns);
         file->private_data = ocus;
 
         MOD_INC_USE_COUNT;
@@ -209,6 +208,9 @@ int class_handle_ioctl(struct obd_class_user_state *ocus, unsigned int cmd,
         int err = 0, len = 0, serialised = 0;
         ENTRY;
 
+        if ((cmd & 0xffffff00) == ((int)'T') << 8) /* ignore all tty ioctls */
+                RETURN(err = -ENOTTY);
+
         switch (cmd) {
         case OBD_IOC_BRW_WRITE:
         case OBD_IOC_BRW_READ:
@@ -222,7 +224,8 @@ int class_handle_ioctl(struct obd_class_user_state *ocus, unsigned int cmd,
                 break;
         }
 
-        if (!obd && cmd != OBD_IOC_DEVICE && cmd != TCGETS &&
+        CDEBUG(D_IOCTL, "cmd = %x, obd = %p\n", cmd, obd);
+        if (!obd && cmd != OBD_IOC_DEVICE &&
             cmd != OBD_IOC_LIST && cmd != OBD_GET_VERSION &&
             cmd != OBD_IOC_NAME2DEV && cmd != OBD_IOC_NEWDEV &&
             cmd != OBD_IOC_ADD_UUID && cmd != OBD_IOC_DEL_UUID  &&
@@ -237,8 +240,6 @@ int class_handle_ioctl(struct obd_class_user_state *ocus, unsigned int cmd,
         data = (struct obd_ioctl_data *)buf;
 
         switch (cmd) {
-        case TCGETS:
-                GOTO(out, err=-EINVAL);
         case OBD_IOC_DEVICE: {
                 CDEBUG(D_IOCTL, "\n");
                 if (data->ioc_dev >= MAX_OBD_DEVICES || data->ioc_dev < 0) {
@@ -266,6 +267,7 @@ int class_handle_ioctl(struct obd_class_user_state *ocus, unsigned int cmd,
                         int l;
                         char *status;
                         struct obd_device *obd = &obd_dev[i];
+
                         if (!obd->obd_type)
                                 continue;
                         if (obd->obd_flags & OBD_SET_UP)
@@ -663,17 +665,17 @@ int class_handle_ioctl(struct obd_class_user_state *ocus, unsigned int cmd,
 #define OBD_MINOR 241
 #ifdef __KERNEL__
 /* to control /dev/obd */
-static int obd_class_ioctl (struct inode * inode, struct file * filp,
-                     unsigned int cmd, unsigned long arg)
+static int obd_class_ioctl(struct inode *inode, struct file *filp,
+                           unsigned int cmd, unsigned long arg)
 {
         return class_handle_ioctl(filp->private_data, cmd, arg);
 }
 
 /* declare character device */
 static struct file_operations obd_psdev_fops = {
-        ioctl: obd_class_ioctl,      /* ioctl */
-        open: obd_class_open,        /* open */
-        release: obd_class_release,  /* release */
+        ioctl:   obd_class_ioctl,       /* ioctl */
+        open:    obd_class_open,        /* open */
+        release: obd_class_release,     /* release */
 };
 
 /* modules setup */
@@ -712,6 +714,7 @@ void obd_kmap_get(int count, int server)
         if (count == 1)
                 atomic_dec(&obd_kmap_count);
         else while (atomic_add_negative(-count, &obd_kmap_count)) {
+                struct l_wait_info lwi = { 0 };
                 static long next_show = 0;
                 static int skipped = 0;
 
@@ -729,8 +732,8 @@ void obd_kmap_get(int count, int server)
                         skipped = 0;
                 } else
                         skipped++;
-                wait_event(obd_kmap_waitq,
-                           atomic_read(&obd_kmap_count) >= count);
+                l_wait_event(obd_kmap_waitq,
+                             atomic_read(&obd_kmap_count) >= count, &lwi);
         }
 }
 
index 6fcf504..bd43554 100644 (file)
@@ -121,7 +121,7 @@ int class_register_type(struct obd_ops *ops, struct lprocfs_vars *vars,
 
         type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
                                               vars, type);
-        if (type->typ_procroot && IS_ERR(type->typ_procroot)) {
+        if (IS_ERR(type->typ_procroot)) {
                 rc = PTR_ERR(type->typ_procroot);
                 type->typ_procroot = NULL;
                 list_del(&type->typ_chain);
@@ -328,15 +328,14 @@ struct obd_import *class_conn2ldlmimp(struct lustre_handle *conn)
 
 struct obd_export *class_new_export(struct obd_device *obddev)
 {
-        struct obd_export * export;
+        struct obd_export *export;
 
-        export = kmem_cache_alloc(export_cachep, GFP_KERNEL);
+        PORTAL_SLAB_ALLOC(export, export_cachep, sizeof(*export));
         if (!export) {
                 CERROR("no memory! (minor %d)\n", obddev->obd_minor);
                 return NULL;
         }
 
-        memset(export, 0, sizeof(*export));
         get_random_bytes(&export->exp_cookie, sizeof(export->exp_cookie));
         export->exp_obd = obddev;
         /* XXX this should be in LDLM init */
@@ -374,8 +373,7 @@ void class_destroy_export(struct obd_export *exp)
                 ptlrpc_abort_inflight_superhack(&exp->exp_ldlm_data.led_import,
                                                 1);
 
-        exp->exp_cookie = DEAD_HANDLE_MAGIC;
-        kmem_cache_free(export_cachep, exp);
+        PORTAL_SLAB_FREE(exp, export_cachep, sizeof(*exp));
 }
 
 /* a connection defines an export context in which preallocation can
index 9f7544b..2239762 100644 (file)
@@ -515,7 +515,7 @@ echo_client_kbrw (struct obd_device *obd, int rw,
         }
         OBD_FREE(pga, npages * sizeof(*pga));
  out_0:
-        obd_brw_set_free(set);
+        obd_brw_set_decref(set);
         return (rc);
 }
 
@@ -594,7 +594,7 @@ static int echo_client_ubrw(struct obd_device *obd, int rw,
  out_1:
         OBD_FREE(pga, npages * sizeof(*pga));
  out_0:
-        obd_brw_set_free(set);
+        obd_brw_set_decref(set);
         return (rc);
 }
 #else
index 7e17804..4e4e8b1 100644 (file)
@@ -3,7 +3,6 @@
 # This code is issued under the GNU General Public License.
 # See the file COPYING in this distribution
 
-DEFS = $(ENABLE_OST_RECOVERY)
 MODULE = obdfilter
 modulefs_DATA = obdfilter.o
 EXTRA_PROGRAMS = obdfilter
index 8486c22..0632af0 100644 (file)
@@ -296,7 +296,7 @@ int filter_finish_transno(struct obd_export *export, void *handle,
 #endif
         if (written == sizeof(*fcd))
                 RETURN(0);
-        CERROR("error writing to last_rcvd file: rc = %d\n", written);
+        CERROR("error writing to last_rcvd file: rc = %d\n", (int)written);
         if (written >= 0)
                 RETURN(-EIO);
 
@@ -506,7 +506,7 @@ static int filter_init_server_data(struct obd_device *obd, struct file * filp,
         struct filter_client_data *fcd = NULL;
         struct inode *inode = filp->f_dentry->d_inode;
         unsigned long last_rcvd_size = inode->i_size;
-        __u64 mount_count;
+        __u64 mount_count = 0;
         int cl_idx;
         loff_t off = 0;
         int rc;
@@ -545,8 +545,9 @@ static int filter_init_server_data(struct obd_device *obd, struct file * filp,
                 ssize_t retval = lustre_fread(filp, (char *)fsd, sizeof(*fsd),
                                               &off);
                 if (retval != sizeof(*fsd)) {
-                        CDEBUG(D_INODE,"OBD filter: error reading lastobjid\n");
-                        GOTO(out, rc = -EIO);
+                        CDEBUG(D_INODE,"OBD filter: error reading %s\n",
+                               LAST_RCVD);
+                        GOTO(err_fsd, rc = -EIO);
                 }
                 mount_count = le64_to_cpu(fsd->fsd_mount_count);
                 filter->fo_subdir_count = le16_to_cpu(fsd->fsd_subdir_count);
@@ -555,13 +556,13 @@ static int filter_init_server_data(struct obd_device *obd, struct file * filp,
         if (fsd->fsd_feature_incompat) {
                 CERROR("unsupported feature %x\n",
                        le32_to_cpu(fsd->fsd_feature_incompat));
-                RETURN(-EINVAL);
+                GOTO(err_fsd, rc = -EINVAL);
         }
         if (fsd->fsd_feature_rocompat) {
                 CERROR("read-only feature %x\n",
                        le32_to_cpu(fsd->fsd_feature_rocompat));
                 /* Do something like remount filesystem read-only */
-                RETURN(-EINVAL);
+                GOTO(err_fsd, rc = -EINVAL);
         }
 
         CDEBUG(D_INODE, "%s: server last_objid: "LPU64"\n",
@@ -584,86 +585,89 @@ static int filter_init_server_data(struct obd_device *obd, struct file * filp,
          * the header.  If we find clients with higher last_rcvd values
          * then those clients may need recovery done.
          */
-        if (obd->obd_flags & OBD_REPLAYABLE) {
-                for (cl_idx = 0; off < last_rcvd_size; cl_idx++) {
-                        __u64 last_rcvd;
-                        int mount_age;
-
-                        if (!fcd) {
-                                OBD_ALLOC(fcd, sizeof(*fcd));
-                                if (!fcd)
-                                        GOTO(err_fsd, rc = -ENOMEM);
-                        }
+        if (!(obd->obd_flags & OBD_REPLAYABLE)) {
+                CERROR("%s: recovery support OFF\n", obd->obd_name);
+                GOTO(out, rc = 0);
+        }
 
-                        /* Don't assume off is incremented properly, in case
-                         * sizeof(fsd) isn't the same as fsd->fsd_client_size.
-                         */
-                        off = le32_to_cpu(fsd->fsd_client_start) +
-                                cl_idx * le16_to_cpu(fsd->fsd_client_size);
-                        rc = lustre_fread(filp, (char *)fcd, sizeof(*fcd), &off);
-                        if (rc != sizeof(*fcd)) {
-                                CERROR("error reading FILTER %s offset %d: rc = %d\n",
-                                       LAST_RCVD, cl_idx, rc);
-                                if (rc > 0) /* XXX fatal error or just abort reading? */
-                                        rc = -EIO;
-                                break;
-                        }
+        for (cl_idx = 0; off < last_rcvd_size; cl_idx++) {
+                __u64 last_rcvd;
+                int mount_age;
 
-                        if (fcd->fcd_uuid[0] == '\0') {
-                                CDEBUG(D_INFO, "skipping zeroed client at offset %d\n",
-                                       cl_idx);
-                                continue;
-                        }
+                if (!fcd) {
+                        OBD_ALLOC(fcd, sizeof(*fcd));
+                        if (!fcd)
+                                GOTO(err_fsd, rc = -ENOMEM);
+                }
 
-                        last_rcvd = le64_to_cpu(fcd->fcd_last_rcvd);
+                /* Don't assume off is incremented properly, in case
+                 * sizeof(fsd) isn't the same as fsd->fsd_client_size.
+                 */
+                off = le32_to_cpu(fsd->fsd_client_start) +
+                        cl_idx * le16_to_cpu(fsd->fsd_client_size);
+                rc = lustre_fread(filp, (char *)fcd, sizeof(*fcd), &off);
+                if (rc != sizeof(*fcd)) {
+                        CERROR("error reading FILTER %s offset %d: rc = %d\n",
+                               LAST_RCVD, cl_idx, rc);
+                        if (rc > 0) /* XXX fatal error or just abort reading? */
+                                rc = -EIO;
+                        break;
+                }
 
-                        /* These exports are cleaned up by filter_disconnect(), so they
-                         * need to be set up like real exports as filter_connect() does.
-                         */
-                        mount_age = mount_count - le64_to_cpu(fcd->fcd_mount_count);
-                        if (mount_age < FILTER_MOUNT_RECOV) {
-                                struct obd_export *exp = class_new_export(obd);
-                                struct filter_export_data *fed;
-                                CERROR("RCVRNG CLIENT uuid: %s idx: %d lr: "LPU64
-                                       " srv lr: "LPU64" mnt: "LPU64" last mount: "
-                                       LPU64"\n", fcd->fcd_uuid, cl_idx,
-                                       last_rcvd, le64_to_cpu(fsd->fsd_last_rcvd),
-                                       le64_to_cpu(fcd->fcd_mount_count), mount_count);
-                                /* disabled until OST recovery is actually working */
-
-                                if (!exp) {
-                                        rc = -ENOMEM;
-                                        break;
-                                }
-                                memcpy(&exp->exp_client_uuid.uuid, fcd->fcd_uuid,
-                                       sizeof exp->exp_client_uuid.uuid);
-                                fed = &exp->exp_filter_data;
-                                fed->fed_fcd = fcd;
-                                filter_client_add(filter, fed, cl_idx);
-                                /* create helper if export init gets more complex */
-                                INIT_LIST_HEAD(&fed->fed_open_head);
-                                spin_lock_init(&fed->fed_lock);
-
-                                fcd = NULL;
-                                obd->obd_recoverable_clients++;
-                        } else {
-                                CDEBUG(D_INFO,
-                                       "discarded client %d UUID '%s' count "LPU64"\n",
-                                       cl_idx, fcd->fcd_uuid,
-                                       le64_to_cpu(fcd->fcd_mount_count));
-                        }
+                if (fcd->fcd_uuid[0] == '\0') {
+                        CDEBUG(D_INFO, "skipping zeroed client at offset %d\n",
+                               cl_idx);
+                        continue;
+                }
 
-                        CDEBUG(D_OTHER, "client at idx %d has last_rcvd = "LPU64"\n",
-                               cl_idx, last_rcvd);
+                last_rcvd = le64_to_cpu(fcd->fcd_last_rcvd);
 
-                        if (last_rcvd > le64_to_cpu(filter->fo_fsd->fsd_last_rcvd))
-                                filter->fo_fsd->fsd_last_rcvd = cpu_to_le64(last_rcvd);
+                /* These exports are cleaned up by filter_disconnect(), so they
+                 * need to be set up like real exports as filter_connect() does.
+                 */
+                mount_age = mount_count - le64_to_cpu(fcd->fcd_mount_count);
+                if (mount_age < FILTER_MOUNT_RECOV) {
+                        struct obd_export *exp = class_new_export(obd);
+                        struct filter_export_data *fed;
+                        CERROR("RCVRNG CLIENT uuid: %s idx: %d lr: "LPU64
+                               " srv lr: "LPU64" mnt: "LPU64" last mount: "
+                               LPU64"\n", fcd->fcd_uuid, cl_idx,
+                               last_rcvd, le64_to_cpu(fsd->fsd_last_rcvd),
+                               le64_to_cpu(fcd->fcd_mount_count), mount_count);
+                        /* disabled until OST recovery is actually working */
+
+                        if (!exp) {
+                                rc = -ENOMEM;
+                                break;
+                        }
+                        memcpy(&exp->exp_client_uuid.uuid, fcd->fcd_uuid,
+                               sizeof exp->exp_client_uuid.uuid);
+                        fed = &exp->exp_filter_data;
+                        fed->fed_fcd = fcd;
+                        filter_client_add(filter, fed, cl_idx);
+                        /* create helper if export init gets more complex */
+                        INIT_LIST_HEAD(&fed->fed_open_head);
+                        spin_lock_init(&fed->fed_lock);
+
+                        fcd = NULL;
+                        obd->obd_recoverable_clients++;
+                } else {
+                        CDEBUG(D_INFO,
+                               "discarded client %d UUID '%s' count "LPU64"\n",
+                               cl_idx, fcd->fcd_uuid,
+                               le64_to_cpu(fcd->fcd_mount_count));
                 }
 
+                CDEBUG(D_OTHER, "client at idx %d has last_rcvd = "LPU64"\n",
+                       cl_idx, last_rcvd);
+
+                if (last_rcvd > le64_to_cpu(filter->fo_fsd->fsd_last_rcvd))
+                        filter->fo_fsd->fsd_last_rcvd = cpu_to_le64(last_rcvd);
+
                 obd->obd_last_committed = le64_to_cpu(filter->fo_fsd->fsd_last_rcvd);
                 if (obd->obd_recoverable_clients) {
-                        CERROR("RECOVERY: %d recoverable clients, last_rcvd "LPU64"\n",
-                               obd->obd_recoverable_clients,
+                        CERROR("RECOVERY: %d recoverable clients, last_rcvd "
+                               LPU64"\n", obd->obd_recoverable_clients,
                                le64_to_cpu(filter->fo_fsd->fsd_last_rcvd));
                         obd->obd_next_recovery_transno = obd->obd_last_committed + 1;
                         obd->obd_flags |= OBD_RECOVERING;
@@ -672,16 +676,14 @@ static int filter_init_server_data(struct obd_device *obd, struct file * filp,
                 if (fcd)
                         OBD_FREE(fcd, sizeof(*fcd));
 
-        } else {
-                CERROR("%s: recovery support OFF\n", obd->obd_name);
         }
 
+out:
         fsd->fsd_mount_count = cpu_to_le64(mount_count + 1);
 
         /* save it,so mount count and last_recvd is current */
         rc = filter_update_server_data(filp, filter->fo_fsd);
 
-out:
         RETURN(rc);
 
 err_fsd:
@@ -768,7 +770,7 @@ static int filter_prep(struct obd_device *obd)
         if (filter->fo_subdir_count) {
                 O_dentry = filter->fo_dentry_O_mode[S_IFREG >> S_SHIFT];
                 OBD_ALLOC(filter->fo_dentry_O_sub,
-                          FILTER_SUBDIR_COUNT * sizeof(dentry));
+                          filter->fo_subdir_count * sizeof(dentry));
                 if (!filter->fo_dentry_O_sub)
                         GOTO(err_client, rc = -ENOMEM);
 
@@ -1144,17 +1146,14 @@ static int filter_common_setup(struct obd_device *obd, obd_count len, void *buf,
 
         mnt = do_kern_mount(data->ioc_inlbuf2, 0, data->ioc_inlbuf1, option);
         rc = PTR_ERR(mnt);
-        if (IS_ERR(mnt)) {
-                CERROR("mount of %s as type %s failed: rc %d\n",
-                       data->ioc_inlbuf2, data->ioc_inlbuf1, rc);
+        if (IS_ERR(mnt))
                 GOTO(err_ops, rc);
-        }
 
 #if OST_RECOVERY
         obd->obd_flags |= OBD_REPLAYABLE;
 #endif
 
-        filter = &obd->u.filter;;
+        filter = &obd->u.filter;
         filter->fo_vfsmnt = mnt;
         filter->fo_fstype = strdup(data->ioc_inlbuf2);
         filter->fo_sb = mnt->mnt_root->d_inode->i_sb;
@@ -1293,8 +1292,15 @@ static int filter_connect(struct lustre_handle *conn, struct obd_device *obd,
                 RETURN(rc);
         exp = class_conn2export(conn);
         LASSERT(exp);
+
         fed = &exp->exp_filter_data;
 
+        INIT_LIST_HEAD(&exp->exp_filter_data.fed_open_head);
+        spin_lock_init(&exp->exp_filter_data.fed_lock);
+
+        if (!(obd->obd_flags & OBD_REPLAYABLE))
+                RETURN(0);
+
         OBD_ALLOC(fcd, sizeof(*fcd));
         if (!fcd) {
                 CERROR("filter: out of memory for client data\n");
@@ -1305,14 +1311,9 @@ static int filter_connect(struct lustre_handle *conn, struct obd_device *obd,
         fed->fed_fcd = fcd;
         fcd->fcd_mount_count = cpu_to_le64(filter->fo_fsd->fsd_mount_count);
 
-        INIT_LIST_HEAD(&exp->exp_filter_data.fed_open_head);
-        spin_lock_init(&exp->exp_filter_data.fed_lock);
-
-        if (obd->obd_flags & OBD_REPLAYABLE) {
-                rc = filter_client_add(filter, fed, -1);
-                if (rc)
-                        GOTO(out_fcd, rc);
-        }
+        rc = filter_client_add(filter, fed, -1);
+        if (rc)
+                GOTO(out_fcd, rc);
 
         RETURN(rc);
 
@@ -1355,7 +1356,7 @@ static int filter_disconnect(struct lustre_handle *conn)
 
         ldlm_cancel_locks_for_export(exp);
 
-        if (exp->exp_obd->obd_flags & OBD_REPLAYABLE) 
+        if (exp->exp_obd->obd_flags & OBD_REPLAYABLE)
                 filter_client_free(exp);
 
         rc = class_disconnect(conn);
@@ -1638,7 +1639,7 @@ static int filter_create(struct lustre_handle *conn, struct obdo *oa,
 
                 /* This would only happen if lastobjid was bad on disk */
                 CERROR("objid %s already exists\n",
-                       filter_id(buf, filter, S_IFREG, oa->o_id));
+                       filter_id(buf, filter, oa->o_mode, oa->o_id));
                 LBUG();
                 GOTO(out, rc = -EEXIST);
         }
@@ -1912,14 +1913,6 @@ struct page *filter_get_page_write(struct inode *inode,
 
 
         /* This page is currently locked, so get a temporary page instead. */
-        /* XXX I believe this is a very dangerous thing to do - consider if
-         *     we had multiple writers for the same file (definitely the case
-         *     if we are using this codepath).  If writer A locks the page,
-         *     writer B writes to a copy (as here), writer A drops the page
-         *     lock, and writer C grabs the lock before B does, then B will
-         *     later overwrite the data from C, even if C had LDLM locked
-         *     and initiated the write after B did.
-         */
         if (!page) {
                 unsigned long addr;
                 CDEBUG(D_ERROR,"ino %lu page %ld locked\n", inode->i_ino,index);
@@ -2052,7 +2045,7 @@ static int filter_preprw(int cmd, struct lustre_handle *conn,
                                                               o->ioo_id),
                                            o->ioo_id, 0);
 
-                if (IS_ERR(dentry))
+                if (IS_ERR(dentry)) 
                         GOTO(out_objinfo, rc = PTR_ERR(dentry));
 
                 fso[i].fso_dentry = dentry;
@@ -2395,6 +2388,7 @@ static int filter_san_preprw(int cmd, struct lustre_handle *conn,
         for (i = 0; i < objcount; i++, o++) {
                 struct dentry *dentry;
                 struct inode *inode;
+                int (*fs_bmap)(struct address_space *, long);
                 int j;
 
                 dentry = filter_fid2dentry(obd, filter_parent(obd, S_IFREG,
@@ -2409,15 +2403,15 @@ static int filter_san_preprw(int cmd, struct lustre_handle *conn,
                         f_dput(dentry);
                         GOTO(out, rc = -ENOENT);
                 }
+                fs_bmap = inode->i_mapping->a_ops->bmap;
 
                 for (j = 0; j < o->ioo_bufcnt; j++, rnb++) {
                         long block;
 
-                        block = rnb->offset >> PAGE_SHIFT;
+                        block = rnb->offset >> inode->i_blkbits;
 
                         if (cmd == OBD_BRW_READ) {
-                                block = inode->i_mapping->a_ops->bmap(
-                                                inode->i_mapping, block);
+                                block = fs_bmap(inode->i_mapping, block);
                         } else {
                                 loff_t newsize = rnb->offset + rnb->len;
                                 /* fs_prep_san_write will also update inode
@@ -2496,6 +2490,8 @@ int filter_copy_data(struct lustre_handle *dst_conn, struct obdo *dst,
         unsigned long index = 0;
         int err = 0;
 
+        LBUG(); /* THIS CODE IS NOT CORRECT -phil */
+
         memset(&srcmd, 0, sizeof(srcmd));
         memset(&dstmd, 0, sizeof(dstmd));
         srcmd.lsm_object_id = src->o_id;
@@ -2539,7 +2535,7 @@ int filter_copy_data(struct lustre_handle *dst_conn, struct obdo *dst,
                 page->index = index;
                 set->brw_callback = ll_brw_sync_wait;
                 err = obd_brw(OBD_BRW_READ, src_conn, &srcmd, 1, &pg, set,NULL);
-                obd_brw_set_free(set);
+                obd_brw_set_decref(set);
                 if (err) {
                         EXIT;
                         break;
@@ -2556,7 +2552,7 @@ int filter_copy_data(struct lustre_handle *dst_conn, struct obdo *dst,
 
                 set->brw_callback = ll_brw_sync_wait;
                 err = obd_brw(OBD_BRW_WRITE, dst_conn, &dstmd, 1, &pg, set,oti);
-                obd_brw_set_free(set);
+                obd_brw_set_decref(set);
 
                 /* XXX should handle dst->o_size, dst->o_blocks here */
                 if (err) {
index ea205a6..515aa70 100644 (file)
@@ -523,6 +523,8 @@ static void osc_ptl_ev_hdlr(struct ptlrpc_bulk_desc *desc)
         LASSERT(desc->bd_brw_set != NULL);
         LASSERT(desc->bd_brw_set->brw_callback != NULL);
 
+        /* It's important that you don't use desc->bd_brw_set after this
+         * callback runs.  If you do, take a reference on it. */
         desc->bd_brw_set->brw_callback(desc->bd_brw_set, CB_PHASE_FINISH);
 
         /* We can't kunmap the desc from interrupt context, so we do it from
@@ -547,7 +549,17 @@ static void osc_ptl_ev_abort(struct ptlrpc_bulk_desc *desc)
 
         LASSERT(desc->bd_brw_set != NULL);
 
-        ptlrpc_abort_bulk(desc);
+        /* XXX reconcile this with ll_sync_brw_timeout() handling, and/or
+         *     just make osc_ptl_ev_hdlr() check desc->bd_flags for either
+         *     PTL_BULK_FL_RCVD or PTL_BULK_FL_SENT, and pass CB_PHASE_ABORT
+         *     to brw_callback() and do the rest of the cleanup there.  I
+         *     also think ll_sync_brw_timeout() is missing an PtlMEUnlink,
+         *     but I could be wrong.
+         */
+        if (ptlrpc_abort_bulk(desc)) {
+                EXIT;
+                return;
+        }
         obd_brw_set_del(desc);
         unmap_and_decref_bulk_desc(desc);
 
@@ -565,7 +577,7 @@ static int osc_brw_read(struct lustre_handle *conn, struct lov_stripe_md *lsm,
         struct ost_body *body;
         int rc, size[3] = {sizeof(*body)}, mapped = 0;
         struct obd_ioobj *iooptr;
-        void *nioptr;
+        struct niobuf_remote *nioptr;
         __u32 xid;
         ENTRY;
 
@@ -589,26 +601,27 @@ restart_bulk:
 
         iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
         nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
-        ost_pack_ioo(&iooptr, lsm, page_count);
+        ost_pack_ioo(iooptr, lsm, page_count);
         /* end almost identical to brw_write case */
 
         xid = ptlrpc_next_xid();       /* single xid for all pages */
 
         obd_kmap_get(page_count, 0);
 
-        for (mapped = 0; mapped < page_count; mapped++) {
+        for (mapped = 0; mapped < page_count; mapped++, nioptr++) {
                 struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
                 if (bulk == NULL) {
                         unmap_and_decref_bulk_desc(desc);
                         GOTO(out_req, rc = -ENOMEM);
                 }
 
-                bulk->bp_xid = xid;           /* single xid for all pages */
+                LASSERT(mapped == 0 || pga[mapped].off > pga[mapped - 1].off);
 
+                bulk->bp_xid = xid;           /* single xid for all pages */
                 bulk->bp_buf = kmap(pga[mapped].pg);
                 bulk->bp_page = pga[mapped].pg;
                 bulk->bp_buflen = PAGE_SIZE;
-                ost_pack_niobuf(&nioptr, pga[mapped].off, pga[mapped].count,
+                ost_pack_niobuf(nioptr, pga[mapped].off, pga[mapped].count,
                                 pga[mapped].flag, bulk->bp_xid);
         }
 
@@ -703,7 +716,7 @@ static int osc_brw_write(struct lustre_handle *conn, struct lov_stripe_md *lsm,
         struct ost_body *body;
         int rc, size[3] = {sizeof(*body)}, mapped = 0;
         struct obd_ioobj *iooptr;
-        void *nioptr;
+        struct niobuf_remote *nioptr;
         __u32 xid;
 #if CHECKSUM_BULK
         __u64 cksum = 0;
@@ -729,26 +742,29 @@ restart_bulk:
 
         iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
         nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
-        ost_pack_ioo(&iooptr, lsm, page_count);
+        ost_pack_ioo(iooptr, lsm, page_count);
         /* end almost identical to brw_read case */
 
         xid = ptlrpc_next_xid();       /* single xid for all pages */
 
         obd_kmap_get(page_count, 0);
 
-        for (mapped = 0; mapped < page_count; mapped++) {
+        for (mapped = 0; mapped < page_count; mapped++, nioptr++) {
                 struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
                 if (bulk == NULL) {
                         unmap_and_decref_bulk_desc(desc);
                         GOTO(out_req, rc = -ENOMEM);
                 }
 
-                bulk->bp_xid = xid;           /* single xid for all pages */
+                LASSERT(mapped == 0 || pga[mapped].off > pga[mapped - 1].off);
 
+                bulk->bp_xid = xid;           /* single xid for all pages */
                 bulk->bp_buf = kmap(pga[mapped].pg);
                 bulk->bp_page = pga[mapped].pg;
+                /* matching ptlrpc_bulk_get assert */
+                LASSERT(pga[mapped].count > 0);
                 bulk->bp_buflen = pga[mapped].count;
-                ost_pack_niobuf(&nioptr, pga[mapped].off, pga[mapped].count,
+                ost_pack_niobuf(nioptr, pga[mapped].off, pga[mapped].count,
                                 pga[mapped].flag, bulk->bp_xid);
                 ost_checksum(&cksum, bulk->bp_buf, bulk->bp_buflen);
         }
@@ -808,6 +824,10 @@ restart_bulk:
 #define OSC_BRW_MAX_SIZE 65536
 #define OSC_BRW_MAX_IOV min_t(int, PTL_MD_MAX_IOV, OSC_BRW_MAX_SIZE/PAGE_SIZE)
 
+#warning "FIXME: make these values dynamic based on a get_info call at setup"
+#define OSC_BRW_MAX_SIZE 65536
+#define OSC_BRW_MAX_IOV min_t(int, PTL_MD_MAX_IOV, OSC_BRW_MAX_SIZE/PAGE_SIZE)
+
 static int osc_brw(int cmd, struct lustre_handle *conn,
                    struct lov_stripe_md *md, obd_count page_count,
                    struct brw_page *pga, struct obd_brw_set *set,
@@ -843,21 +863,20 @@ static int osc_brw(int cmd, struct lustre_handle *conn,
 /* Note: caller will lock/unlock, and set uptodate on the pages */
 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
 static int sanosc_brw_read(struct lustre_handle *conn,
-                           struct lov_stripe_md *md,
+                           struct lov_stripe_md *lsm,
                            obd_count page_count,
                            struct brw_page *pga,
                            struct obd_brw_set *set)
 {
         struct ptlrpc_request *request = NULL;
         struct ost_body *body;
-        struct niobuf_remote *remote, *nio_rep;
-        int rc, j, size[3] = {sizeof(*body)}, mapped = 0;
+        struct niobuf_remote *nioptr;
         struct obd_ioobj *iooptr;
-        void *nioptr;
+        int rc, j, size[3] = {sizeof(*body)}, mapped = 0;
         ENTRY;
 
         size[1] = sizeof(struct obd_ioobj);
-        size[2] = page_count * sizeof(*remote);
+        size[2] = page_count * sizeof(*nioptr);
 
         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_SAN_READ, 3,
                                   size, NULL);
@@ -867,19 +886,20 @@ static int sanosc_brw_read(struct lustre_handle *conn,
         body = lustre_msg_buf(request->rq_reqmsg, 0);
         iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
         nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
-        ost_pack_ioo(&iooptr, md, page_count);
+        ost_pack_ioo(iooptr, lsm, page_count);
 
         obd_kmap_get(page_count, 0);
 
-        for (mapped = 0; mapped < page_count; mapped++) {
+        for (mapped = 0; mapped < page_count; mapped++, nioptr++) {
                 LASSERT(PageLocked(pga[mapped].pg));
+                LASSERT(mapped == 0 || pga[mapped].off > pga[mapped - 1].off);
 
                 kmap(pga[mapped].pg);
-                ost_pack_niobuf(&nioptr, pga[mapped].off, pga[mapped].count,
+                ost_pack_niobuf(nioptr, pga[mapped].off, pga[mapped].count,
                                 pga[mapped].flag, 0);
         }
 
-        size[1] = page_count * sizeof(*remote);
+        size[1] = page_count * sizeof(*nioptr);
         request->rq_replen = lustre_msg_size(2, size);
 
         rc = ptlrpc_queue_wait(request);
@@ -896,25 +916,19 @@ static int sanosc_brw_read(struct lustre_handle *conn,
                 GOTO(out_unmap, rc = -EINVAL);
         }
 
-        for (j = 0; j < page_count; j++) {
-                ost_unpack_niobuf(&nioptr, &remote);
-        }
-
-        nioptr = lustre_msg_buf(request->rq_repmsg, 1);
-        nio_rep = (struct niobuf_remote*)nioptr;
-
         /* actual read */
-        for (j = 0; j < page_count; j++) {
+        for (j = 0; j < page_count; j++, nioptr++) {
                 struct page *page = pga[j].pg;
                 struct buffer_head *bh;
                 kdev_t dev;
 
+                ost_unpack_niobuf(nioptr, nioptr);
                 /* got san device associated */
                 LASSERT(class_conn2obd(conn));
                 dev = class_conn2obd(conn)->u.cli.cl_sandev;
 
                 /* hole */
-                if (!nio_rep[j].offset) {
+                if (!nioptr->offset) {
                         CDEBUG(D_PAGE, "hole at ino %lu; index %ld\n",
                                         page->mapping->host->i_ino,
                                         page->index);
@@ -928,7 +942,7 @@ static int sanosc_brw_read(struct lustre_handle *conn,
 
                         clear_bit(BH_New, &bh->b_state);
                         set_bit(BH_Mapped, &bh->b_state);
-                        bh->b_blocknr = (unsigned long)nio_rep[j].offset;
+                        bh->b_blocknr = (unsigned long)nioptr->offset;
 
                         clear_bit(BH_Uptodate, &bh->b_state);
 
@@ -940,8 +954,7 @@ static int sanosc_brw_read(struct lustre_handle *conn,
                          * one we mapped before, check it */
                         LASSERT(!test_bit(BH_New, &bh->b_state));
                         LASSERT(test_bit(BH_Mapped, &bh->b_state));
-                        LASSERT(bh->b_blocknr ==
-                                (unsigned long)nio_rep[j].offset);
+                        LASSERT(bh->b_blocknr == (unsigned long)nioptr->offset);
 
                         /* wait it's io completion */
                         if (test_bit(BH_Lock, &bh->b_state))
@@ -976,21 +989,20 @@ out_unmap:
 }
 
 static int sanosc_brw_write(struct lustre_handle *conn,
-                            struct lov_stripe_md *md,
+                            struct lov_stripe_md *lsm,
                             obd_count page_count,
                             struct brw_page *pga,
                             struct obd_brw_set *set)
 {
         struct ptlrpc_request *request = NULL;
         struct ost_body *body;
-        struct niobuf_remote *remote, *nio_rep;
-        int rc, j, size[3] = {sizeof(*body)}, mapped = 0;
+        struct niobuf_remote *nioptr;
         struct obd_ioobj *iooptr;
-        void *nioptr;
+        int rc, j, size[3] = {sizeof(*body)}, mapped = 0;
         ENTRY;
 
         size[1] = sizeof(struct obd_ioobj);
-        size[2] = page_count * sizeof(*remote);
+        size[2] = page_count * sizeof(*nioptr);
 
         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_SAN_WRITE,
                                   3, size, NULL);
@@ -1000,19 +1012,20 @@ static int sanosc_brw_write(struct lustre_handle *conn,
         body = lustre_msg_buf(request->rq_reqmsg, 0);
         iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
         nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
-        ost_pack_ioo(&iooptr, md, page_count);
+        ost_pack_ioo(iooptr, lsm, page_count);
 
         /* map pages, and pack request */
         obd_kmap_get(page_count, 0);
-        for (mapped = 0; mapped < page_count; mapped++) {
+        for (mapped = 0; mapped < page_count; mapped++, nioptr++) {
                 LASSERT(PageLocked(pga[mapped].pg));
+                LASSERT(mapped == 0 || pga[mapped].off > pga[mapped - 1].off);
 
                 kmap(pga[mapped].pg);
-                ost_pack_niobuf(&nioptr, pga[mapped].off, pga[mapped].count,
+                ost_pack_niobuf(nioptr, pga[mapped].off, pga[mapped].count,
                                 pga[mapped].flag, 0);
         }
 
-        size[1] = page_count * sizeof(*remote);
+        size[1] = page_count * sizeof(*nioptr);
         request->rq_replen = lustre_msg_size(2, size);
 
         rc = ptlrpc_queue_wait(request);
@@ -1029,19 +1042,13 @@ static int sanosc_brw_write(struct lustre_handle *conn,
                 GOTO(out_unmap, rc = -EINVAL);
         }
 
-        for (j = 0; j < page_count; j++) {
-                ost_unpack_niobuf(&nioptr, &remote);
-        }
-
-        nioptr = lustre_msg_buf(request->rq_repmsg, 1);
-        nio_rep = (struct niobuf_remote*)nioptr;
-
         /* actual write */
-        for (j = 0; j < page_count; j++) {
+        for (j = 0; j < page_count; j++, nioptr++) {
                 struct page *page = pga[j].pg;
                 struct buffer_head *bh;
                 kdev_t dev;
 
+                ost_unpack_niobuf(nioptr, nioptr);
                 /* got san device associated */
                 LASSERT(class_conn2obd(conn));
                 dev = class_conn2obd(conn)->u.cli.cl_sandev;
@@ -1053,7 +1060,7 @@ static int sanosc_brw_write(struct lustre_handle *conn,
                         LASSERT(!test_bit(BH_New, &page->buffers->b_state));
                         LASSERT(test_bit(BH_Mapped, &page->buffers->b_state));
                         LASSERT(page->buffers->b_blocknr ==
-                                (unsigned long)nio_rep[j].offset);
+                                (unsigned long)nioptr->offset);
                 }
                 bh = page->buffers;
 
@@ -1067,7 +1074,7 @@ static int sanosc_brw_write(struct lustre_handle *conn,
                 set_bit(BH_Mapped, &bh->b_state);
 
                 /* override the block nr */
-                bh->b_blocknr = (unsigned long)nio_rep[j].offset;
+                bh->b_blocknr = (unsigned long)nioptr->offset;
 
                 /* we are about to write it, so set it
                  * uptodate/dirty
@@ -1099,30 +1106,9 @@ out_unmap:
 
         goto out_req;
 }
-#else
-static int sanosc_brw_read(struct lustre_handle *conn,
-                           struct lov_stripe_md *md,
-                           obd_count page_count,
-                           struct brw_page *pga,
-                           struct obd_brw_set *set)
-{
-        LBUG();
-        return 0;
-}
-
-static int sanosc_brw_write(struct lustre_handle *conn,
-                            struct lov_stripe_md *md,
-                            obd_count page_count,
-                            struct brw_page *pga,
-                            struct obd_brw_set *set)
-{
-        LBUG();
-        return 0;
-}
-#endif
 
 static int sanosc_brw(int cmd, struct lustre_handle *conn,
-                      struct lov_stripe_md *md, obd_count page_count,
+                      struct lov_stripe_md *lsm, obd_count page_count,
                       struct brw_page *pga, struct obd_brw_set *set,
                       struct obd_trans_info *oti)
 {
@@ -1138,10 +1124,10 @@ static int sanosc_brw(int cmd, struct lustre_handle *conn,
                         pages_per_brw = page_count;
 
                 if (cmd & OBD_BRW_WRITE)
-                        rc = sanosc_brw_write(conn, md, pages_per_brw,
+                        rc = sanosc_brw_write(conn, lsm, pages_per_brw,
                                               pga, set);
                 else
-                        rc = sanosc_brw_read(conn, md, pages_per_brw, pga, set);
+                        rc = sanosc_brw_read(conn, lsm, pages_per_brw, pga,set);
 
                 if (rc != 0)
                         RETURN(rc);
@@ -1152,6 +1138,7 @@ static int sanosc_brw(int cmd, struct lustre_handle *conn,
         RETURN(0);
 }
 #endif
+#endif
 
 static int osc_enqueue(struct lustre_handle *connh, struct lov_stripe_md *lsm,
                        struct lustre_handle *parent_lock,
@@ -1178,7 +1165,7 @@ static int osc_enqueue(struct lustre_handle *connh, struct lov_stripe_md *lsm,
                              sizeof(extent), mode, lockh);
         if (rc == 1)
                 /* We already have a lock, and it's referenced */
-                RETURN(ELDLM_OK);
+                RETURN(ELDLM_LOCK_MATCHED);
 
         /* If we're trying to read, we also search for an existing PW lock.  The
          * VFS and page cache already protect us locally, so lots of readers/
@@ -1202,7 +1189,7 @@ static int osc_enqueue(struct lustre_handle *connh, struct lov_stripe_md *lsm,
                         ldlm_lock_addref(lockh, LCK_PR);
                         ldlm_lock_decref(lockh, LCK_PW);
 
-                        RETURN(ELDLM_OK);
+                        RETURN(ELDLM_LOCK_MATCHED);
                 }
         }
 
index 18a1b85..848336c 100644 (file)
@@ -249,14 +249,13 @@ static int ost_brw_read(struct ptlrpc_request *req)
 {
         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
         struct ptlrpc_bulk_desc *desc;
-        struct obd_ioobj *tmp1;
-        void *tmp2, *end2;
         struct niobuf_remote *remote_nb;
         struct niobuf_local *local_nb = NULL;
         struct obd_ioobj *ioo;
         struct ost_body *body;
         struct l_wait_info lwi;
         void *desc_priv = NULL;
+        void *end2;
         int cmd, i, j, objcount, niocount, size = sizeof(*body);
         int rc = 0;
 #if CHECKSUM_BULK
@@ -265,9 +264,9 @@ static int ost_brw_read(struct ptlrpc_request *req)
         ENTRY;
 
         body = lustre_msg_buf(req->rq_reqmsg, 0);
-        tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
-        tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
-        end2 = (char *)tmp2 + req->rq_reqmsg->buflens[2];
+        ioo = lustre_msg_buf(req->rq_reqmsg, 1);
+        remote_nb = lustre_msg_buf(req->rq_reqmsg, 2);
+        end2 = (char *)remote_nb + req->rq_reqmsg->buflens[2];
         objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo);
         niocount = req->rq_reqmsg->buflens[2] / sizeof(*remote_nb);
         cmd = OBD_BRW_READ;
@@ -282,15 +281,29 @@ static int ost_brw_read(struct ptlrpc_request *req)
         if (rc)
                 GOTO(out, req->rq_status = rc);
 
-        for (i = 0; i < objcount; i++) {
-                ost_unpack_ioo(&tmp1, &ioo);
-                if (tmp2 + ioo->ioo_bufcnt > end2) {
+        for (i = 0; i < objcount; i++, ioo++) {
+                ost_unpack_ioo(ioo, ioo);
+                if ((void *)(remote_nb + ioo->ioo_bufcnt) > end2) {
+                        CERROR("BRW: objid "LPX64" count %u larger than %u\n",
+                               ioo->ioo_id, ioo->ioo_bufcnt,
+                               (int)(end2 - (void *)remote_nb));
                         LBUG();
-                        GOTO(out, rc = -EFAULT);
+                        GOTO(out, rc = -EINVAL);
                 }
-                for (j = 0; j < ioo->ioo_bufcnt; j++) {
-                        /* XXX verify niobuf[j].offset > niobuf[j-1].offset */
-                        ost_unpack_niobuf(&tmp2, &remote_nb);
+                for (j = 0; j < ioo->ioo_bufcnt; j++, remote_nb++) {
+                        ost_unpack_niobuf(remote_nb, remote_nb);
+                        if (remote_nb->len == 0) {
+                                CERROR("zero len BRW: objid "LPX64" buf %u\n",
+                                       ioo->ioo_id, j);
+                                GOTO(out, rc = -EINVAL);
+                        }
+                        if (j && remote_nb->offset <= (remote_nb - 1)->offset) {
+                                CERROR("unordered BRW: objid "LPX64
+                                       " buf %u offset "LPX64" <= "LPX64"\n",
+                                       ioo->ioo_id, j, remote_nb->offset,
+                                       (remote_nb - 1)->offset);
+                                GOTO(out, rc = -EINVAL);
+                        }
                 }
         }
 
@@ -298,7 +311,7 @@ static int ost_brw_read(struct ptlrpc_request *req)
         if (local_nb == NULL)
                 GOTO(out, rc = -ENOMEM);
 
-        /* The unpackers move tmp1 and tmp2, so reset them before using */
+        /* The unpackers move ioo and remote_nb, so reset them before using */
         ioo = lustre_msg_buf(req->rq_reqmsg, 1);
         remote_nb = lustre_msg_buf(req->rq_reqmsg, 2);
         req->rq_status = obd_preprw(cmd, conn, objcount, ioo, niocount,
@@ -363,9 +376,8 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
 {
         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
         struct ptlrpc_bulk_desc *desc;
-        struct obd_ioobj *tmp1;
-        void *tmp2, *end2;
         struct niobuf_remote *remote_nb;
+        void *end2;
         struct niobuf_local *local_nb = NULL;
         struct obd_ioobj *ioo;
         struct ost_body *body;
@@ -376,9 +388,9 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
         ENTRY;
 
         body = lustre_msg_buf(req->rq_reqmsg, 0);
-        tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
-        tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
-        end2 = (char *)tmp2 + req->rq_reqmsg->buflens[2];
+        ioo = lustre_msg_buf(req->rq_reqmsg, 1);
+        remote_nb = lustre_msg_buf(req->rq_reqmsg, 2);
+        end2 = (void *)remote_nb + req->rq_reqmsg->buflens[2];
         objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo);
         niocount = req->rq_reqmsg->buflens[2] / sizeof(*remote_nb);
         cmd = OBD_BRW_WRITE;
@@ -386,15 +398,29 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
         if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_WRITE_BULK))
                 GOTO(out, req->rq_status = -EIO);
 
-        for (i = 0; i < objcount; i++) {
-                ost_unpack_ioo(&tmp1, &ioo);
-                if (tmp2 + ioo->ioo_bufcnt > end2) {
+        for (i = 0; i < objcount; i++, ioo++) {
+                ost_unpack_ioo(ioo, ioo);
+                if ((void *)(remote_nb + ioo->ioo_bufcnt) > end2) {
+                        CERROR("BRW: objid "LPX64" count %u larger than %u\n",
+                               ioo->ioo_id, ioo->ioo_bufcnt,
+                               (int)(end2 - (void *)remote_nb));
                         LBUG();
-                        GOTO(out, rc = -EFAULT);
+                        GOTO(out, rc = -EINVAL);
                 }
-                for (j = 0; j < ioo->ioo_bufcnt; j++) {
-                        /* XXX verify niobuf[j].offset > niobuf[j-1].offset */
-                        ost_unpack_niobuf(&tmp2, &remote_nb);
+                for (j = 0; j < ioo->ioo_bufcnt; j++, remote_nb++) {
+                        ost_unpack_niobuf(remote_nb, remote_nb);
+                        if (remote_nb->len == 0) {
+                                CERROR("zero len BRW: objid "LPX64" buf %u\n",
+                                       ioo->ioo_id, j);
+                                GOTO(out, rc = -EINVAL);
+                        }
+                        if (j && remote_nb->offset <= (remote_nb - 1)->offset) {
+                                CERROR("unordered BRW: objid "LPX64
+                                       " buf %u offset "LPX64" <= "LPX64"\n",
+                                       ioo->ioo_id, j, remote_nb->offset,
+                                       (remote_nb - 1)->offset);
+                                GOTO(out, rc = -EINVAL);
+                        }
                 }
         }
 
@@ -402,9 +428,10 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
         if (local_nb == NULL)
                 GOTO(out, rc = -ENOMEM);
 
-        /* The unpackers move tmp1 and tmp2, so reset them before using */
+        /* The unpackers move ioo and remote_nb, so reset them before using */
         ioo = lustre_msg_buf(req->rq_reqmsg, 1);
         remote_nb = lustre_msg_buf(req->rq_reqmsg, 2);
+
         req->rq_status = obd_preprw(cmd, conn, objcount, ioo, niocount,
                                     remote_nb, local_nb, &desc_priv, oti);
 
@@ -505,26 +532,28 @@ static int ost_san_brw(struct ptlrpc_request *req, int alloc)
         struct obd_ioobj *ioo;
         struct ost_body *body;
         int cmd, rc, i, j, objcount, niocount, size[2] = {sizeof(*body)};
-        void *tmp1, *tmp2, *end2;
+        void *end2;
         ENTRY;
 
         body = lustre_msg_buf(req->rq_reqmsg, 0);
-        tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
-        tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
-        end2 = (char *)tmp2 + req->rq_reqmsg->buflens[2];
+        ioo = lustre_msg_buf(req->rq_reqmsg, 1);
+        remote_nb = lustre_msg_buf(req->rq_reqmsg, 2);
+        end2 = (void *)remote_nb + req->rq_reqmsg->buflens[2];
         objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo);
         niocount = req->rq_reqmsg->buflens[2] / sizeof(*remote_nb);
-        
+
         cmd = alloc ? OBD_BRW_WRITE : OBD_BRW_READ;
 
-        for (i = 0; i < objcount; i++) {
-                ost_unpack_ioo((void *)&tmp1, &ioo);
-                if (tmp2 + ioo->ioo_bufcnt > end2) {
-                        rc = -EFAULT;
-                        break;
+        for (i = 0; i < objcount; i++, ioo++) {
+                ost_unpack_ioo(ioo, ioo);
+                if ((void *)(remote_nb + ioo->ioo_bufcnt) > end2) {
+                        CERROR("BRW: objid "LPX64" count %u larger than %u\n",
+                               ioo->ioo_id, ioo->ioo_bufcnt,
+                               (int)(end2 - (void *)remote_nb));
+                        GOTO(out, rc = -EINVAL);
                 }
-                for (j = 0; j < ioo->ioo_bufcnt; j++)
-                        ost_unpack_niobuf((void *)&tmp2, &remote_nb);
+                for (j = 0; j < ioo->ioo_bufcnt; j++, remote_nb++)
+                        ost_unpack_niobuf(remote_nb, remote_nb);
         }
 
         size[1] = niocount * sizeof(*remote_nb);
@@ -532,12 +561,12 @@ static int ost_san_brw(struct ptlrpc_request *req, int alloc)
         if (rc)
                 GOTO(out, rc);
 
-        /* The unpackers move tmp1 and tmp2, so reset them before using */
-        tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
-        tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
+        /* The unpackers move ioo and remote_nb, so reset them before using */
+        ioo = lustre_msg_buf(req->rq_reqmsg, 1);
+        remote_nb = lustre_msg_buf(req->rq_reqmsg, 2);
 
-        req->rq_status = obd_san_preprw(cmd, conn, objcount, tmp1,
-                                        niocount, tmp2);
+        req->rq_status = obd_san_preprw(cmd, conn, objcount, ioo,
+                                        niocount, remote_nb);
 
         if (req->rq_status) {
                 rc = 0;
@@ -546,15 +575,9 @@ static int ost_san_brw(struct ptlrpc_request *req, int alloc)
 
         remote_nb = lustre_msg_buf(req->rq_repmsg, 1);
         res_nb = lustre_msg_buf(req->rq_reqmsg, 2);
-        for (i = 0; i < niocount; i++) {
-                /* this advances remote_nb */
-                ost_pack_niobuf((void **)&remote_nb,
-                                res_nb[i].offset,
-                                res_nb[i].len, /* 0 */
-                                res_nb[i].flags, /* 0 */
-                                res_nb[i].xid
-                                );
-        }
+        for (i = 0; i < niocount; i++, remote_nb++, res_nb++)
+                ost_pack_niobuf(remote_nb, res_nb->offset, res_nb->len,
+                                res_nb->flags, res_nb->xid);
 
         rc = 0;
 
index 70ea9e4..28ca368 100644 (file)
@@ -191,6 +191,7 @@ static void ptlbd_request(request_queue_t *q)
                 spin_unlock_irq(&io_request_lock);
 
                 /* XXX dunno if we're supposed to get this or not.. */
+                /* __make_request() changes READA to READ - Kris */
                 LASSERT(req->cmd != READA);
 
                 if ( req->cmd == READ )
@@ -198,7 +199,7 @@ static void ptlbd_request(request_queue_t *q)
                 else 
                         cmd = PTLBD_WRITE;
 
-                ptlbd_send_req(ptlbd, cmd, req->bh);
+                ptlbd_send_req(ptlbd, cmd, req);
 
                 spin_lock_irq(&io_request_lock);
 
@@ -234,7 +235,8 @@ int ptlbd_blk_init(void)
 
         for ( i = 0 ; i < PTLBD_MAX_MINOR ; i++) {
                 ptlbd_size_size[i] = 4096;
-                ptlbd_size[i] = (4096*2048) >> BLOCK_SIZE_BITS;
+                /* avoid integer overflow */
+                ptlbd_size[i] = (16*1024*((1024*1024) >> BLOCK_SIZE_BITS));
                 ptlbd_hardsect_size[i] = 4096;
                 ptlbd_max_sectors[i] = 2;
                 //RHism ptlbd_dev_varyio[i] = 0;
@@ -246,12 +248,9 @@ int ptlbd_blk_init(void)
 
 void ptlbd_blk_exit(void)
 {
-        int ret;
         ENTRY;
         blk_cleanup_queue(BLK_DEFAULT_QUEUE(PTLBD_MAJOR));
-        ret = unregister_blkdev(PTLBD_MAJOR, "ptlbd");
-        if ( ret )  /* XXX */
-                printk("unregister_blkdev() failed: %d\n", ret);
+        unregister_blkdev(PTLBD_MAJOR, "ptlbd");
 }
 
 #undef MAJOR_NR
index 67d0b85..8d957db 100644 (file)
@@ -83,10 +83,17 @@ static int ptlbd_cl_setup(struct obd_device *obddev, obd_count len, void *buf)
 
 static int ptlbd_cl_cleanup(struct obd_device *obddev)
 {
-//        struct ptlbd_obd *ptlbd = &obddev->u.ptlbd;
+        struct ptlbd_obd *ptlbd = &obddev->u.ptlbd;
         ENTRY;
 
-        CERROR("I should be cleaning things up\n");
+        if (!ptlbd)
+                RETURN(-ENOENT);
+
+        if (!ptlbd->bd_import.imp_connection)
+                RETURN(-ENOENT);
+
+        ptlrpc_cleanup_client(&ptlbd->bd_import);
+        ptlrpc_put_connection(ptlbd->bd_import.imp_connection);
 
         RETURN(0);
 }
index a95cc3f..e3fde99 100644 (file)
@@ -62,6 +62,7 @@ static void __exit ptlbd_exit(void)
         ENTRY;
         ptlbd_cl_exit();
         ptlbd_sv_exit();
+        ptlbd_blk_exit();
         EXIT;
 }
 
index 4daee83..d3e5083 100644 (file)
 #include <linux/lprocfs_status.h>
 #include <linux/obd_ptlbd.h>
 
+#define RSP_OK       0
+#define RSP_NOTOK   -1
+#define RQ_OK        0
+
 int ptlbd_send_req(struct ptlbd_obd *ptlbd, ptlbd_cmd_t cmd, 
-                struct buffer_head *first_bh)
+                struct request *blkreq)
 {
+        struct buffer_head *first_bh = blkreq->bh;
         struct obd_import *imp = &ptlbd->bd_import;
         struct ptlbd_op *op;
         struct ptlbd_niob *niob, *niobs;
@@ -108,9 +113,13 @@ int ptlbd_send_req(struct ptlbd_obd *ptlbd, ptlbd_cmd_t cmd,
         req->rq_level = imp->imp_level;
         rc = ptlrpc_queue_wait(req);
 
-        if ( rc == 0 ) {
-                rsp = lustre_msg_buf(req->rq_repmsg, 0);
-                /* XXX do stuff */
+        if ( rc != 0 ) {
+                blkreq->errors++;
+                GOTO(out_desc, rc);
+        }
+        rsp = lustre_msg_buf(req->rq_repmsg, 0);
+        if (rsp->r_status != RSP_OK) {
+                blkreq->errors += rsp->r_error_cnt;
         }
 
 out_desc:
@@ -130,11 +139,12 @@ static int ptlbd_bulk_timeout(void *data)
         RETURN(1);
 }
 
-void ptlbd_do_filp(struct file *filp, int op, struct ptlbd_niob *niobs, 
+int ptlbd_do_filp(struct file *filp, int op, struct ptlbd_niob *niobs, 
                 int page_count, struct list_head *page_list)
 {
         mm_segment_t old_fs;
         struct list_head *pos;
+        int status = RSP_OK;
         ENTRY;
 
         old_fs = get_fs();
@@ -145,19 +155,23 @@ void ptlbd_do_filp(struct file *filp, int op, struct ptlbd_niob *niobs,
                 struct page *page = list_entry(pos, struct page, list);
                 loff_t offset = (niobs->n_block_nr << PAGE_SHIFT) + 
                         niobs->n_offset;
-
-                if ( op == PTLBD_READ )
-                        ret = filp->f_op->read(filp, page_address(page), 
-                                        niobs->n_length, &offset);
-                else
-                        ret = filp->f_op->write(filp, page_address(page), 
-                                        niobs->n_length, &offset);
+                if ( op == PTLBD_READ ) {
+                        if ((ret = filp->f_op->read(filp, page_address(page), 
+                             niobs->n_length, &offset)) != niobs->n_length)
+                                status = ret;
+                                goto out;             
+                } else {
+                        if ((ret = filp->f_op->write(filp, page_address(page), 
+                             niobs->n_length, &offset)) != niobs->n_length)
+                                status = ret;
+                                goto out;             
+                }               
 
                 niobs++;
         }
-
+out:
         set_fs(old_fs);
-        EXIT;
+        RETURN(status);
 }
 
 int ptlbd_parse_req(struct ptlrpc_request *req)
@@ -168,7 +182,8 @@ int ptlbd_parse_req(struct ptlrpc_request *req)
         struct ptlrpc_bulk_desc *desc;
         struct file *filp = req->rq_obd->u.ptlbd.filp;
         struct l_wait_info lwi;
-        int size[1], wait_flag, i, page_count, rc;
+        int size[1], wait_flag, i, page_count, rc, error_cnt = 0, 
+            status = RSP_OK;
         struct list_head *pos, *n;
         LIST_HEAD(tmp_pages);
         ENTRY;
@@ -199,16 +214,16 @@ int ptlbd_parse_req(struct ptlrpc_request *req)
                         GOTO(out_bulk, rc = -ENOMEM);
                 list_add(&bulk->bp_page->list, &tmp_pages);
 
-                /* 
-                 * XXX what about the block number? 
-                 */
                 bulk->bp_xid = niob->n_xid;
                 bulk->bp_buf = page_address(bulk->bp_page);
                 bulk->bp_buflen = niob->n_length;
         }
 
         if ( op->op_cmd == PTLBD_READ ) {
-                ptlbd_do_filp(filp, PTLBD_READ, niobs, page_count, &tmp_pages);
+                if ((status = ptlbd_do_filp(filp, PTLBD_READ, niobs, 
+                                          page_count, &tmp_pages)) < 0) {
+                        error_cnt++;
+                }
                 rc = ptlrpc_bulk_put(desc);
                 wait_flag = PTL_BULK_FL_SENT;
         } else {
@@ -232,12 +247,17 @@ int ptlbd_parse_req(struct ptlrpc_request *req)
         if ( rsp == NULL )
                 GOTO(out, rc = -EINVAL);
         
-        ptlbd_do_filp(filp, PTLBD_WRITE, niobs, page_count, &tmp_pages);
+        if ( op->op_cmd == PTLBD_WRITE ) {
+                if ((status = ptlbd_do_filp(filp, PTLBD_WRITE, niobs, 
+                                           page_count, &tmp_pages)) < 0) {
+                        error_cnt++;
+                }
+        }
 
-        rsp->r_error_cnt = 42;
-        rsp->r_status = 69;
+        rsp->r_error_cnt = error_cnt;
+        rsp->r_status = status;                         /* I/O status */
+        req->rq_status = RQ_OK ; /* XXX */              /* ptlbd req status */
 
-        req->rq_status = 0; /* XXX */
         ptlrpc_reply(req->rq_svc, req);
 
 out_bulk:
index 793354d..e4a7046 100644 (file)
@@ -32,6 +32,8 @@
 #include <linux/lprocfs_status.h>
 #include <linux/obd_ptlbd.h>
 
+#define BACKING_FILE    "/tmp/ptlbd-backing-file-la-la-la"
+
 static int ptlbd_sv_already_setup = 1;
 
 static int ptlbd_sv_setup(struct obd_device *obddev, obd_count len, void *buf)
@@ -40,8 +42,9 @@ static int ptlbd_sv_setup(struct obd_device *obddev, obd_count len, void *buf)
         int rc;
         ENTRY;
 
-        ptlbd->filp = filp_open("/tmp/ptlbd-backing-file-la-la-la", 
-                                        O_RDWR|O_CREAT, 0600);
+        ptlbd->filp = filp_open(BACKING_FILE,
+                                        O_RDWR|O_CREAT|O_LARGEFILE, 0600);
+
         if ( IS_ERR(ptlbd->filp) )
                 RETURN(PTR_ERR(ptlbd->filp));
 
index 7d80d5f..998c462 100644 (file)
@@ -243,12 +243,13 @@ int ll_brw_sync_wait(struct obd_brw_set *set, int phase)
         int rc = 0;
         ENTRY;
 
+        obd_brw_set_addref(set);
         switch(phase) {
         case CB_PHASE_START:
                 lwi = LWI_TIMEOUT_INTR(obd_timeout * HZ, ll_sync_brw_timeout,
                                        ll_sync_brw_intr, set);
                 rc = l_wait_event(set->brw_waitq,
-                                  atomic_read(&set->brw_refcount) == 0, &lwi);
+                                  atomic_read(&set->brw_desc_count) == 0, &lwi);
 
                 list_for_each_safe(tmp, next, &set->brw_desc_head) {
                         struct ptlrpc_bulk_desc *desc =
@@ -259,12 +260,13 @@ int ll_brw_sync_wait(struct obd_brw_set *set, int phase)
                 }
                 break;
         case CB_PHASE_FINISH:
-                if (atomic_dec_and_test(&set->brw_refcount))
+                if (atomic_dec_and_test(&set->brw_desc_count))
                         wake_up(&set->brw_waitq);
                 break;
         default:
                 LBUG();
         }
+        obd_brw_set_decref(set);
 
         RETURN(rc);
 }
@@ -294,6 +296,7 @@ struct ptlrpc_request *ptlrpc_prep_req(struct obd_import *imp, int opcode,
                 RETURN(NULL);
         }
 
+        request->rq_timeout = obd_timeout;
         request->rq_level = LUSTRE_CONN_FULL;
         request->rq_type = PTL_RPC_MSG_REQUEST;
         request->rq_import = imp;
@@ -730,7 +733,7 @@ int ptlrpc_queue_wait(struct ptlrpc_request *req)
                                        interrupted_request, req);
         } else {
                 DEBUG_REQ(D_NET, req, "-- sleeping");
-                lwi = LWI_TIMEOUT_INTR(obd_timeout * HZ, expired_request,
+                lwi = LWI_TIMEOUT_INTR(req->rq_timeout * HZ, expired_request,
                                        interrupted_request, req);
         }
 #ifdef __KERNEL__
@@ -816,9 +819,11 @@ int ptlrpc_queue_wait(struct ptlrpc_request *req)
                 }
                 imp->imp_level = LUSTRE_CONN_RECOVD;
                 spin_unlock_irqrestore(&imp->imp_lock, flags);
-                rc = imp->imp_recover(imp, PTLRPC_RECOVD_PHASE_NOTCONN);
-                if (rc)
-                        LBUG();
+                if (imp->imp_recover != NULL) {
+                        rc = imp->imp_recover(imp, PTLRPC_RECOVD_PHASE_NOTCONN);
+                        if (rc)
+                                LBUG();
+                }
                 GOTO(out, rc = -EIO);
         }
 
@@ -917,6 +922,7 @@ void ptlrpc_abort_inflight(struct obd_import *imp, int dying_import)
 {
         unsigned long flags;
         struct list_head *tmp, *n;
+        ENTRY;
 
         /* Make sure that no new requests get processed for this import.
          * ptlrpc_queue_wait must (and does) hold imp_lock while testing this
@@ -949,4 +955,5 @@ void ptlrpc_abort_inflight(struct obd_import *imp, int dying_import)
                         req->rq_import = NULL;
                 wake_up(&req->rq_wait_for_rep);
         }
+        EXIT;
 }
index 62a76c4..3b1d32f 100644 (file)
@@ -401,12 +401,22 @@ int ptlrpc_register_bulk_put(struct ptlrpc_bulk_desc *desc)
 
 int ptlrpc_abort_bulk(struct ptlrpc_bulk_desc *desc)
 {
+        int rc1, rc2;
         /* This should be safe: these handles are initialized to be
          * invalid in ptlrpc_prep_bulk() */
-        PtlMDUnlink(desc->bd_md_h);
-        PtlMEUnlink(desc->bd_me_h);
+        rc1 = PtlMDUnlink(desc->bd_md_h);
+        if (rc1 != PTL_OK)
+                CERROR("PtlMDUnlink: %d\n", rc1);
+        rc2 = PtlMEUnlink(desc->bd_me_h);
+        if (rc2 != PTL_OK)
+                CERROR("PtlMEUnlink: %d\n", rc2);
+
+        return rc1 ? rc1 : rc2;
+}
 
-        return 0;
+void obd_brw_set_addref(struct obd_brw_set *set)
+{
+        atomic_inc(&set->brw_refcount);
 }
 
 void obd_brw_set_add(struct obd_brw_set *set, struct ptlrpc_bulk_desc *desc)
@@ -414,14 +424,14 @@ void obd_brw_set_add(struct obd_brw_set *set, struct ptlrpc_bulk_desc *desc)
         LASSERT(list_empty(&desc->bd_set_chain));
 
         ptlrpc_bulk_addref(desc);
-        atomic_inc(&set->brw_refcount);
+        atomic_inc(&set->brw_desc_count);
         desc->bd_brw_set = set;
         list_add(&desc->bd_set_chain, &set->brw_desc_head);
 }
 
 void obd_brw_set_del(struct ptlrpc_bulk_desc *desc)
 {
-        atomic_dec(&desc->bd_brw_set->brw_refcount);
+        atomic_dec(&desc->bd_brw_set->brw_desc_count);
         list_del_init(&desc->bd_set_chain);
         ptlrpc_bulk_decref(desc);
 }
@@ -435,13 +445,14 @@ struct obd_brw_set *obd_brw_set_new(void)
         if (set != NULL) {
                 init_waitqueue_head(&set->brw_waitq);
                 INIT_LIST_HEAD(&set->brw_desc_head);
-                atomic_set(&set->brw_refcount, 0);
+                atomic_set(&set->brw_refcount, 1);
+                atomic_set(&set->brw_desc_count, 0);
         }
 
         return set;
 }
 
-void obd_brw_set_free(struct obd_brw_set *set)
+static void obd_brw_set_free(struct obd_brw_set *set)
 {
         struct list_head *tmp, *next;
         ENTRY;
@@ -459,6 +470,14 @@ void obd_brw_set_free(struct obd_brw_set *set)
         return;
 }
 
+void obd_brw_set_decref(struct obd_brw_set *set)
+{
+        ENTRY;
+        if (atomic_dec_and_test(&set->brw_refcount))
+                obd_brw_set_free(set);
+        EXIT;
+}
+
 int ptlrpc_reply(struct ptlrpc_service *svc, struct ptlrpc_request *req)
 {
         if (req->rq_repmsg == NULL) {
index 0f13acf..c0d5ba5 100644 (file)
@@ -255,10 +255,11 @@ EXPORT_SYMBOL(ptlrpc_error);
 EXPORT_SYMBOL(ptlrpc_resend_req);
 EXPORT_SYMBOL(ptl_send_rpc);
 EXPORT_SYMBOL(ptlrpc_link_svc_me);
-EXPORT_SYMBOL(obd_brw_set_free);
 EXPORT_SYMBOL(obd_brw_set_new);
 EXPORT_SYMBOL(obd_brw_set_add);
 EXPORT_SYMBOL(obd_brw_set_del);
+EXPORT_SYMBOL(obd_brw_set_decref);
+EXPORT_SYMBOL(obd_brw_set_addref);
 
 /* client.c */
 EXPORT_SYMBOL(ptlrpc_init_client);
index 112d01d..3338445 100644 (file)
@@ -216,7 +216,7 @@ static int handle_incoming_request(struct obd_device *obddev,
         }
 
         CDEBUG(D_RPCTRACE, "Handling RPC ni:pid:xid:nid:opc %d:%d:"LPU64":"
-               LPX64":%d\n", rqbd->rqbd_srv_ni - &svc->srv_interfaces[0],
+               LPX64":%d\n", (int)(rqbd->rqbd_srv_ni - svc->srv_interfaces),
                NTOH__u32(request->rq_reqmsg->status), request->rq_xid,
                event->initiator.nid, NTOH__u32(request->rq_reqmsg->opc));
 
@@ -334,8 +334,9 @@ static int ptlrpc_main(void *arg)
 
         /* And now, loop forever on requests */
         while (1) {
-                wait_event(svc->srv_waitq,
-                           ptlrpc_check_event(svc, thread, event));
+                struct l_wait_info lwi = { 0 };
+                l_wait_event(svc->srv_waitq,
+                             ptlrpc_check_event(svc, thread, event), &lwi);
 
                 if (thread->t_flags & SVC_STOPPING) {
                         spin_lock(&svc->srv_lock);
@@ -377,12 +378,15 @@ out:
 static void ptlrpc_stop_thread(struct ptlrpc_service *svc,
                                struct ptlrpc_thread *thread)
 {
+        struct l_wait_info lwi = { 0 };
+
         spin_lock(&svc->srv_lock);
         thread->t_flags = SVC_STOPPING;
         spin_unlock(&svc->srv_lock);
 
         wake_up(&svc->srv_waitq);
-        wait_event(thread->t_ctl_waitq, (thread->t_flags & SVC_STOPPED));
+        l_wait_event(thread->t_ctl_waitq, (thread->t_flags & SVC_STOPPED),
+                     &lwi);
 }
 
 void ptlrpc_stop_all_threads(struct ptlrpc_service *svc)
@@ -404,6 +408,7 @@ void ptlrpc_stop_all_threads(struct ptlrpc_service *svc)
 int ptlrpc_start_thread(struct obd_device *dev, struct ptlrpc_service *svc,
                         char *name)
 {
+        struct l_wait_info lwi = { 0 };
         struct ptlrpc_svc_data d;
         struct ptlrpc_thread *thread;
         int rc;
@@ -434,7 +439,7 @@ int ptlrpc_start_thread(struct obd_device *dev, struct ptlrpc_service *svc,
                 OBD_FREE(thread, sizeof(*thread));
                 RETURN(rc);
         }
-        wait_event(thread->t_ctl_waitq, thread->t_flags & SVC_RUNNING);
+        l_wait_event(thread->t_ctl_waitq, thread->t_flags & SVC_RUNNING, &lwi);
 
         RETURN(0);
 }
index 6b40c41..3657c7a 100644 (file)
@@ -71,16 +71,21 @@ rm -rf $RPM_BUILD_ROOT
 cd $RPM_BUILD_DIR/lustre-%{version}
 ./configure --with-linux='%{linuxdir}' --with-portals='%{portalsdir}' --with-portalslib='%{portalslibdir}'
 make
+
+%ifarch i386
 cd $RPM_BUILD_DIR/lustre-%{version}-lib/lustre-%{version}
 ./configure --with-lib --with-portals='%{portalsdir}' --with-portalslib='%{portalslibdir}'
 make
+%endif
 
 %install
 cd $RPM_BUILD_DIR/lustre-%{version}
 make install prefix=$RPM_BUILD_ROOT
 
+%ifarch i386
 cd $RPM_BUILD_DIR/lustre-%{version}-lib/lustre-%{version}
 make install prefix=$RPM_BUILD_ROOT
+%endif
 
 
 # Create the pristine source directory.
@@ -140,6 +145,7 @@ mkdir -p $RPM_BUILD_ROOT/var/lib/ldap/lustre
 %files -n lustre-source
 %attr(-, root, root) /usr/src/lustre-%{version}
 
+%ifarch i386
 %files -n liblustre
 %attr(-, root, root) /lib/lustre
 %attr(-, root, root) /lib/lustre/liblov.a
@@ -158,6 +164,7 @@ mkdir -p $RPM_BUILD_ROOT/var/lib/ldap/lustre
 %attr(-, root, root) /usr/sbin/lconf
 %attr(-, root, root) /usr/sbin/lmc
 %attr(-, root, root) /usr/sbin/llanalyze
+%endif
 
 
 %files -n lustre-ldap
index 6d7b7f4..c0427fd 100644 (file)
@@ -2,13 +2,13 @@
 
 config=${1:-ba-echo.xml}
 
+LMC_REAL="${LMC:-../utils/lmc} -m $config"
 LMC="save_cmd"
-LMC_REAL="../../lustre/utils/lmc -m $config"
 
 TCPBUF=1048576
 OST=${OST:-ba-ost-1}
-CLIENT=client
+CLIENT=`hostname`
+
 UUIDLIST=${UUIDLIST:-/usr/local/admin/ba-ost/UUID.txt}
 
 h2tcp () {
@@ -29,7 +29,7 @@ OST_UUID=`awk "/$OST / { print \\$3 }" $UUIDLIST`
 
 # server node
 ${LMC} --add net --node $OST --tcpbuf $TCPBUF --nid $OST --nettype tcp
-${LMC} --add ost --node $OST --ost ost1 --obdtype=obdecho $OST_UUID 
+${LMC} --add ost --node $OST --ost ost1 --osdtype=obdecho $OST_UUID 
 
 # osc on client
 ${LMC} --add echo_client --node $CLIENT --ost ost1
index 3299011..f529fb0 100644 (file)
@@ -18,6 +18,7 @@ int main(int argc, char **argv)
         int fd;
         char *buf;
         int blocks;
+        long len;
         struct stat st;
         int rc;
 
@@ -41,15 +42,16 @@ int main(int argc, char **argv)
         printf("directio on %s for %dx%lu blocks \n", argv[1], blocks,
                st.st_blksize);
 
-        buf = mmap(0, blocks * st.st_blksize, PROT_READ|PROT_WRITE,
-                   MAP_PRIVATE|MAP_ANON, 0, 0);
+        len = blocks * st.st_blksize;
+        buf = mmap(0, len, PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_ANON, 0, 0);
         if (!buf) {
                 printf("No memory %s\n", strerror(errno));
                 return 1;
         }
 
-        rc = write(fd, buf, blocks * st.st_blksize);
-        if (rc != blocks * st.st_blksize) {
+        memset(buf, 0xba, len);
+        rc = write(fd, buf, len);
+        if (rc != len) {
                 printf("Write error %s (rc = %d)\n", strerror(errno), rc);
                 return 1;
         }
@@ -59,8 +61,8 @@ int main(int argc, char **argv)
                 return 1;
         }
 
-        rc = read(fd, buf, blocks * st.st_blksize);
-        if (rc != blocks * st.st_blksize) {
+        rc = read(fd, buf, len);
+        if (rc != len) {
                 printf("Read error: %s (rc = %d)\n", strerror(errno), rc);
                 return 1;
         }
index 38e79c8..20c8c20 100755 (executable)
@@ -3,13 +3,11 @@ TMP=${TMP:-/tmp}
 LCMD=$TMP/lkcd-cmds-`hostname`
 echo "Storing LKCD module info in $LCMD"
 cat /tmp/ogdb-`hostname` | while read JUNK M JUNK; do
-       DIR=`dirname $M`
-       DIR=`cd $PWD/../$DIR; pwd`
-       MOD="$DIR/`basename $M`"
+       MOD="../$M"
        MAP=`echo $MOD | sed -e 's/\.o$/.map/'`
-       MODNAME=`basename $M | sed -e 's/\.o$//'`
+       MODNAME=`basename $MOD | sed -e 's/\.o$//'`
 
        nm $MOD > $MAP
-       echo namelist -a $MOD  | tee -a $LCMD
-       echo symtab -a $MAP $MODNAME | tee -a $LCMD
+       echo namelist -a $PWD/$MOD  | tee -a $LCMD
+       echo symtab -a $PWD/$MAP $MODNAME | tee -a $LCMD
 done
index cd87766..22d7550 100755 (executable)
@@ -11,5 +11,5 @@ if [ ! -f $config ]; then
    sh $mkconfig $config || exit 1
 fi
 
-${LCONF} --cleanup echo.xml
+${LCONF} --cleanup $NAME.xml
 
index cc4b06d..1294b13 100644 (file)
@@ -62,9 +62,8 @@ int main(int argc, char *argv[])
                         pid_t ret;
 
                         ret = waitpid(0, &status, 0);
-                        if (ret == 0) {
+                        if (ret == 0)
                                 continue;
-                        }
 
                         if (ret < 0) {
                                 fprintf(stderr, "error: %s: wait - %s\n",
@@ -85,9 +84,8 @@ int main(int argc, char *argv[])
                                                 argv[0], ret, err);
                                 if (!rc)
                                         rc = err;
-
-                                live_threads--;
                         }
+                        live_threads--;
                 }
         } else {
                 if (threads)
@@ -115,7 +113,8 @@ int main(int argc, char *argv[])
                                 rc = errno;
                                 break;
                         }
-                        if (ioctl(fd, LL_IOC_SETFLAGS, &ioctl_flags) < 0) {
+                        if (ioctl(fd, LL_IOC_SETFLAGS, &ioctl_flags) < 0 &&
+                            errno != ENOTTY) {
                                 fprintf(stderr, "ioctl(): %s\n",
                                         strerror(errno));
                                 rc = errno;
diff --git a/lustre/tests/runas.c b/lustre/tests/runas.c
new file mode 100644 (file)
index 0000000..3d29f1b
--- /dev/null
@@ -0,0 +1,133 @@
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ */
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <string.h>
+#include <errno.h>
+#include <sys/types.h>
+#include <sys/wait.h>
+
+#define DEBUG 0
+
+void
+Usage_and_abort()
+{
+       fprintf(stderr, "Usage: runas -u user_id [ -g grp_id ]" \
+           " command_to_be_run \n");
+       exit(-1);
+}
+
+// Usage: runas -u user_id [ -g grp_id ] "command_to_be_run"
+// return: the return value of "command_to_be_run"
+// NOTE: returning -1 might be the return code of this program itself or
+// the "command_to_be_run"
+
+// ROOT runs "runas" for free
+// Other users run "runas" requires  chmod 6755 "command_to_be_run"
+
+int 
+main(int argc, char**argv)
+{
+        char command[1024];
+        char *cmd_ptr;
+        int status;
+        int c,i;
+        int gid_is_set = 0;
+        int uid_is_set = 0;
+        uid_t user_id;
+        gid_t grp_id;
+
+        if(argc == 1) {
+                Usage_and_abort();
+        }
+
+        // get UID and GID
+        while ((c = getopt (argc, argv, "u:g:h")) != -1) {
+                switch (c) {
+                case 'u':
+                        user_id = (uid_t)atoi(optarg);
+                        uid_is_set = 1;
+                        if(!gid_is_set) {
+                                  grp_id = user_id;
+                        }
+                 break;
+
+                 case 'g':
+                         grp_id = (gid_t)atoi(optarg);
+                         gid_is_set = 1;
+                 break;
+
+                 case 'h':
+                         Usage_and_abort ();
+                 break;
+
+                 default:
+                 //      fprintf(stderr, "Bad parameters.\n");
+                 //      Usage_and_abort ();
+                 }
+        }
+
+        if (!uid_is_set){
+                Usage_and_abort ();
+        }
+  
+
+        if(optind == argc) {
+                fprintf(stderr, "Bad parameters.\n");
+                Usage_and_abort();
+        }
+
+
+        // assemble the command
+        cmd_ptr = command ;
+        for (i = optind; i < argc; i++)
+                 cmd_ptr += sprintf(cmd_ptr,  "%s ", argv[i]);
+
+
+#if DEBUG
+  system("whoami");
+#endif
+
+        // set GID
+        status = setregid(grp_id, grp_id );
+        if( status == -1) {
+                 fprintf(stderr, "Cannot change grp_ID to %d, errno=%d (%s)\n",
+                  grp_id, errno, strerror(errno) );
+                 exit(-1);
+        }
+
+        // set UID
+        status = setreuid(user_id, user_id );
+        if(status == -1) {
+                  fprintf(stderr,"Cannot change user_ID to %d, errno=%d (%s)\n",
+                   user_id, errno, strerror(errno) );
+                  exit(-1);
+        }
+
+#if DEBUG
+  system("whoami");
+#endif
+
+        fprintf(stdout, "running as USER(%d), Grp (%d):  \"%s\" \n", 
+           user_id, grp_id, command );
+
+        // run the command
+        status = system(command);
+
+        // pass the return code of command_to_be_run out of this wrapper
+        if (status == -1) {
+                 fprintf(stderr, "%s: system() command failed to run\n",
+                           argv[0]);
+        }
+        else{
+                 status = WEXITSTATUS(status);
+                 fprintf(stderr, "[%s #%d] \"%s\" returns %d (%s).\n", argv[0],
+                        user_id, argv[optind], status, strerror(status));
+
+        }
+
+        return(status);
+}
+
index 5c4d47d..fdaf82e 100644 (file)
@@ -146,7 +146,7 @@ $START
 
 log '== mkdir .../d7; mcreate .../d7/f2; echo foo > .../d7/f2 = test 7b'
 $MCREATE $DIR/d7/f2
-log -n foo > $DIR/d7/f2
+echo -n foo > $DIR/d7/f2
 [ "`cat $DIR/d7/f2`" = "foo" ] || error
 $CHECKSTAT -t file -s 3 $DIR/d7/f2 || error
 pass
@@ -312,6 +312,7 @@ pass
 $CLEAN
 $START
 
+
 log '== O_CREAT|O_EXCL in subdir ====================== test 23'
 mkdir $DIR/d23
 $TOEXCL $DIR/d23/f23
index 8e95654..a4930de 100644 (file)
@@ -106,8 +106,35 @@ for C in a b c d e f g h i j k l; do
 done
 [ "`cat $MOUNT1/f11`" = "abcdefghijkl" ] && pass || error
        
-rm -f $MOUNT1/f[0-9]* $MOUNT1/lnk
+echo "test 12: file length and contents across mounts"
+dd if=$SHELL of=$MOUNT1/f12 bs=4096 count=1
+$CHECKSTAT -s 4096 $MOUNT1/f12 $MOUNT2/f12 || error
+dd if=$SHELL bs=4096 count=1 |                                 \
+       md5sum - $MOUNT1/f12 $MOUNT2/f12 | (                    \
+               read GOODSUM DASH;                              \
+               while read SUM FILE ; do                        \
+                       [ $SUM == $GOODSUM ] || exit 2;         \
+               done; ) || error
+
+echo "test 13: open(,O_TRUNC,), close() across mounts"
+dd if=$SHELL of=$MOUNT1/f13 bs=4096 count=1
+> $MOUNT1/f13
+$CHECKSTAT -s 0 $MOUNT1/f13 $MOUNT2/f13 || error
+
+echo "test 14: file extension while holding the fd open"
+> $MOUNT1/f14
+# ugh.
+touch $MOUNT1/f14-start
+sh -c "
+  echo -n a;
+  mv $MOUNT1/f14-start $MOUNT1/f14-going;
+  while [ -f $MOUNT1/f14-going ] ; do sleep 1; done;
+    "  >> $MOUNT1/f14 &
+while [ -f $MOUNT1/f14-start ] ; do sleep 1; done;
+$CHECKSTAT -s 1 $MOUNT1/f14 $MOUNT2/f14 || error
+rm $MOUNT1/f14-going
 
+rm -f $MOUNT1/f[0-9]* $MOUNT1/lnk
 $CLEAN
 
 exit
index 196f32c..396f3b0 100644 (file)
@@ -5,6 +5,8 @@
 #include <stdlib.h>
 #include <errno.h>
 #include <sys/mman.h>
+#include <sys/types.h>
+#include <sys/stat.h>
 
 // not correctly in the headers yet!!
 //#define O_DIRECT 0
@@ -12,7 +14,6 @@
 #define O_DIRECT        040000 /* direct disk access hint */
 #endif
 
-#define BLOCKSIZE 4096
 #define CERROR(fmt, arg...) fprintf(stderr, fmt, ## arg)
 #ifndef __u64
 #define __u64 long long
@@ -91,6 +92,7 @@ int main(int argc, char **argv)
        long long count, last, offset;
        long pg_vec, len;
        long long objid = 3;
+       struct stat st;
        int flags = 0;
        int cmd = 0;
        char *end;
@@ -131,8 +133,6 @@ int main(int argc, char **argv)
                        usage(argv[0]);
                }
        }
-       len = pg_vec * BLOCKSIZE;
-       last = (long long)count * len;
 
        if (argc >= 6) {
                objid = strtoull(argv[5], &end, 0);
@@ -147,13 +147,6 @@ int main(int argc, char **argv)
               argv[0], flags & O_DIRECT ? "directio" : "i/o",
               argv[1], objid, count, pg_vec);
 
-        buf = mmap(0, len, PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_ANON, 0, 0);
-        if (!buf) {
-                fprintf(stderr, "%s: no buffer memory %s\n",
-                       argv[0], strerror(errno));
-                return 2;
-        }
-
         fd = open(argv[1], flags | O_LARGEFILE);
         if (fd == -1) {
                 fprintf(stderr, "%s: cannot open %s:  %s\n", argv[0],
@@ -161,23 +154,41 @@ int main(int argc, char **argv)
                 return 3;
         }
 
+       rc = fstat(fd, &st);
+       if (rc < 0) {
+               fprintf(stderr, "%s: cannot stat %s: %s\n", argv[0],
+                       argv[1], strerror(errno));
+               return 4;
+       }
+
+       len = pg_vec * st.st_blksize;
+       last = (long long)count * len;
+
+        buf = mmap(0, len, PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_ANON, 0, 0);
+        if (!buf) {
+                fprintf(stderr, "%s: no buffer memory %s\n",
+                       argv[0], strerror(errno));
+                return 2;
+        }
+
        for (offset = 0; offset < last && cmd & WRITE; offset += len) {
                int i;
 
-               for (i = 0; i < len; i += BLOCKSIZE)
-                       page_debug_setup(buf + i, BLOCKSIZE, offset + i, objid);
+               for (i = 0; i < len; i += st.st_blksize)
+                       page_debug_setup(buf + i, st.st_blksize, offset + i,
+                                        objid);
 
                rc = write(fd, buf, len);
 
-               for (i = 0; i < len; i += BLOCKSIZE) {
-                       if (page_debug_check("write", buf + i, BLOCKSIZE,
+               for (i = 0; i < len; i += st.st_blksize) {
+                       if (page_debug_check("write", buf + i, st.st_blksize,
                                             offset + i, objid))
                                return 10;
                }
 
                if (rc != len) {
-                       fprintf(stderr, "%s: write error: %s, rc %d\n",
-                               argv[0], strerror(errno), rc);
+                       fprintf(stderr, "%s: write error: %s, rc %d != %ld\n",
+                               argv[0], strerror(errno), rc, len);
                        return 4;
                }
        }
@@ -193,13 +204,13 @@ int main(int argc, char **argv)
 
                rc = read(fd, buf, len);
                if (rc != len) {
-                       fprintf(stderr, "%s: read error: %s, rc %d\n",
-                               argv[0], strerror(errno), rc);
+                       fprintf(stderr, "%s: read error: %s, rc %d != %ld\n",
+                               argv[0], strerror(errno), rc, len);
                        return 6;
                }
 
-               for (i = 0; i < len; i += BLOCKSIZE) {
-                       if (page_debug_check("read", buf + i, BLOCKSIZE,
+               for (i = 0; i < len; i += st.st_blksize) {
+                       if (page_debug_check("read", buf + i, st.st_blksize,
                                             offset + i, objid))
                                return 11;
                }
index 1efbd8c..a143647 100644 (file)
@@ -76,7 +76,7 @@ command_t cmdlist[] = {
         {"add_uuid", jt_obd_add_uuid, 0, "associate a UUID with a nid\n"
          "usage: add_uuid <uuid> <nid> <net_type>"},
         {"close_uuid", jt_obd_close_uuid, 0, "disconnect a UUID\n"
-         "usage: close_uuid <uuid>)"},
+         "usage: close_uuid <uuid> <net-type>)"},
         {"del_uuid", jt_obd_del_uuid, 0, "delete a UUID association\n"
          "usage: del_uuid <uuid>"},
         {"add_route", jt_ptl_add_route, 0,
index c4ecc42..95e5445 100644 (file)
@@ -1958,7 +1958,7 @@ int jt_obd_close_uuid(int argc, char **argv)
         struct obd_ioctl_data data;
 
         if (argc != 3) {
-                fprintf(stderr, "usage: %s <uuid>\n", argv[0]);
+                fprintf(stderr, "usage: %s <uuid> <net-type>\n", argv[0]);
                 return 0;
         }
 
index 3363824..4373071 100644 (file)
@@ -70,13 +70,13 @@ parse_kmg (uint64_t *valp, char *str)
 }
 
 void
-usage (char *cmdname, int help) 
+usage (char *cmdname, int help)
 {
         char *name = strrchr (cmdname, '/');
-        
+
         if (name == NULL)
                 name = cmdname;
-        
+
         fprintf (help ? stdout : stderr,
                  "usage: %s -d device -s size -o offset [-i id][-n reps][-l] oid\n",
                  name);
@@ -85,32 +85,32 @@ usage (char *cmdname, int help)
 int
 exponential_modulus (int i, int base)
 {
-       int   top = base;
-       int   mod = 1;
-       
-       for (;;) {
-               if (i < top)
-                       return (i%mod == 0);
-               
-               mod = top;
-               top *= base;
-       }
+        int   top = base;
+        int   mod = 1;
+
+        for (;;) {
+                if (i < top)
+                        return (i%mod == 0);
+
+                mod = top;
+                top *= base;
+        }
 }
 
 int
-main (int argc, char **argv) 
+main (int argc, char **argv)
 {
         uint64_t              bid = (((uint64_t)gethostid()) << 32) | getpid ();
         int                   set_bid = 0;
         uint64_t              oid;
-       int                   setup = 0;
+        int                   setup = 0;
         int                   device = -1;
-       int                   npeers = 0;
+        int                   npeers = 0;
         int                   reps = 1;
         char                  hostname[128];
         struct obdio_conn    *conn;
-       struct obdio_barrier *b;
-       char                 *end;
+        struct obdio_barrier *b;
+        char                 *end;
         uint64_t              val;
         int                   rc;
         int                   c;
@@ -119,13 +119,13 @@ main (int argc, char **argv)
         memset (hostname, 0, sizeof (hostname));
         gethostname (hostname, sizeof (hostname));
         hostname[sizeof(hostname) - 1] = 0;
-        
+
         while ((c = getopt (argc, argv, "hsi:d:n:p:")) != -1)
                 switch (c) {
                 case 'h':
                         usage (argv[0], 1);
                         return (0);
-                        
+
                 case 'i':
                         bid = strtoll (optarg, &end, 0);
                         if (end == optarg || *end != 0) {
@@ -135,11 +135,11 @@ main (int argc, char **argv)
                         }
                         set_bid = 1;
                         break;
-                        
+
                 case 's':
-                       setup = 1;
+                        setup = 1;
                         break;
-                        
+
                 case 'd':
                         device = strtol (optarg, &end, 0);
                         if (end == optarg || *end != 0 || device < 0) {
@@ -160,7 +160,7 @@ main (int argc, char **argv)
 
                 case 'p':
                         npeers = strtol (optarg, &end, 0);
-                       if (end == optarg || *end != 0 || npeers <= 0) {
+                        if (end == optarg || *end != 0 || npeers <= 0) {
                                 fprintf (stderr, "Can't parse npeers %s\n",
                                          optarg);
                                 return (1);
@@ -174,7 +174,7 @@ main (int argc, char **argv)
 
         if ((!setup && !set_bid) ||
             npeers <= 0 ||
-           device < 0 ||
+            device < 0 ||
             optind == argc) {
                 fprintf (stderr, "%s not specified\n",
                          (!setup && !set_bid) ? "id" :
@@ -182,40 +182,40 @@ main (int argc, char **argv)
                          device < 0 ? "device" : "object id");
                 return (1);
         }
-        
+
         oid = strtoull (argv[optind], &end, 0);
         if (end == argv[optind] || *end != 0) {
                 fprintf (stderr, "Can't parse object id %s\n",
                          argv[optind]);
                 return (1);
         }
-        
+
         conn = obdio_connect (device);
         if (conn == NULL)
                 return (1);
 
-       b = obdio_new_barrier (oid, bid, npeers);
-       if (b == NULL)
-               return (1);
+        b = obdio_new_barrier (oid, bid, npeers);
+        if (b == NULL)
+                return (1);
 
         rc = 0;
-       if (setup) {
-               rc = obdio_setup_barrier (conn, b);
+        if (setup) {
+                rc = obdio_setup_barrier (conn, b);
                 if (rc == 0)
                         printf ("Setup barrier: -d %d -i "LPX64" -p %d -n1 "LPX64"\n",
                                 device, bid, npeers, oid);
-       } else {
-               for (c = 0; c < reps; c++) {
-                       rc = obdio_barrier (conn, b);
-                       if (rc != 0)
-                               break;
-                       if (exponential_modulus (c, 10))
-                               printf ("%s: Barrier %d\n", hostname, c);
-               }
-       }
-
-       free (b);
-        
+        } else {
+                for (c = 0; c < reps; c++) {
+                        rc = obdio_barrier (conn, b);
+                        if (rc != 0)
+                                break;
+                        if (exponential_modulus (c, 10))
+                                printf ("%s: Barrier %d\n", hostname, c);
+                }
+        }
+
+        free (b);
+
         obdio_disconnect (conn);
 
         return (rc == 0 ? 0 : 1);
index 65a4cac..8264761 100644 (file)
@@ -30,9 +30,9 @@
 #include "obdiolib.h"
 
 int
-obdio_test_fixed_extent (struct obdio_conn *conn, 
-                         uint32_t myhid, uint32_t mypid, 
-                         int reps, int locked, uint64_t oid, 
+obdio_test_fixed_extent (struct obdio_conn *conn,
+                         uint32_t myhid, uint32_t mypid,
+                         int reps, int locked, uint64_t oid,
                          uint64_t offset, uint32_t size)
 {
         struct lustre_handle fh;
@@ -44,7 +44,7 @@ obdio_test_fixed_extent (struct obdio_conn *conn,
         int                  j;
         int                  rc;
         int                  rc2;
-        
+
         rc = obdio_open (conn, oid, &fh);
         if (rc != 0) {
                 fprintf (stderr, "Failed to open object "LPX64": %s\n",
@@ -58,7 +58,7 @@ obdio_test_fixed_extent (struct obdio_conn *conn,
                 rc = -1;
                 goto out_0;
         }
-        
+
         for (i = 0; i < reps; i++) {
                 ibuf = (uint32_t *) buffer;
                 for (j = 0; j < size / (4 * sizeof (*ibuf)); j++) {
@@ -77,7 +77,7 @@ obdio_test_fixed_extent (struct obdio_conn *conn,
                                 goto out_1;
                         }
                 }
-                
+
                 rc = obdio_pwrite (conn, oid, buffer, size, offset);
                 if (rc != 0) {
                         fprintf (stderr, "Error writing "LPX64" @ "LPU64" for %u: %s\n",
@@ -87,9 +87,9 @@ obdio_test_fixed_extent (struct obdio_conn *conn,
                         rc = -1;
                         goto out_1;
                 }
-                
+
                 memset (buffer, 0xbb, size);
-                
+
                 rc = obdio_pread (conn, oid, buffer, size, offset);
                 if (rc != 0) {
                         fprintf (stderr, "Error reading "LPX64" @ "LPU64" for %u: %s\n",
@@ -109,7 +109,7 @@ obdio_test_fixed_extent (struct obdio_conn *conn,
                                 goto out_1;
                         }
                 }
-                
+
                 ibuf = (uint32_t *) buffer;
                 for (j = 0; j < size / (4 * sizeof (*ibuf)); j++) {
                         if (ibuf[0] != myhid ||
@@ -177,20 +177,20 @@ parse_kmg (uint64_t *valp, char *str)
 }
 
 void
-usage (char *cmdname, int help) 
+usage (char *cmdname, int help)
 {
         char *name = strrchr (cmdname, '/');
-        
+
         if (name == NULL)
                 name = cmdname;
-        
+
         fprintf (help ? stdout : stderr,
                  "usage: %s -d device -s size -o offset [-i id][-n reps][-l] oid\n",
                  name);
 }
 
 int
-main (int argc, char **argv) 
+main (int argc, char **argv)
 {
         uint32_t           mypid = getpid ();
         uint32_t           myhid = gethostid ();
@@ -214,7 +214,7 @@ main (int argc, char **argv)
                 case 'h':
                         usage (argv[0], 1);
                         return (0);
-                        
+
                 case 'i':
                         switch (sscanf (optarg, "%i.%i", &v1, &v2)) {
                         case 1:
@@ -230,7 +230,7 @@ main (int argc, char **argv)
                                 return (1);
                         }
                         break;
-                        
+
                 case 's':
                         if (parse_kmg (&val, optarg) != 0) {
                                 fprintf (stderr, "Can't parse size %s\n",
@@ -240,7 +240,7 @@ main (int argc, char **argv)
                         size = (uint32_t)val;
                         set_size++;
                         break;
-                        
+
                 case 'o':
                         if (parse_kmg (&val, optarg) != 0) {
                                 fprintf (stderr, "Can't parse offset %s\n",
@@ -282,21 +282,21 @@ main (int argc, char **argv)
                          device < 0 ? "device" : "object id");
                 return (1);
         }
-        
+
         oid = strtoull (argv[optind], &end, 0);
         if (end == argv[optind] || *end != 0) {
                 fprintf (stderr, "Can't parse object id %s\n",
                          argv[optind]);
                 return (1);
         }
-        
+
         conn = obdio_connect (device);
         if (conn == NULL)
                 return (1);
-        
-        rc = obdio_test_fixed_extent (conn, myhid, mypid, reps, locked, 
+
+        rc = obdio_test_fixed_extent (conn, myhid, mypid, reps, locked,
                                       oid, base_offset, size);
-        
+
         obdio_disconnect (conn);
 
         return (rc == 0 ? 0 : 1);
index 0404808..8c79c67 100644 (file)
@@ -44,30 +44,30 @@ obdio_iocinit (struct obdio_conn *conn)
 }
 
 int
-obdio_ioctl (struct obdio_conn *conn, int cmd) 
+obdio_ioctl (struct obdio_conn *conn, int cmd)
 {
         char *buf = conn->oc_buffer;
         int   rc;
         int   rc2;
-        
+
         rc = obd_ioctl_pack (&conn->oc_data, &buf, sizeof (conn->oc_buffer));
         if (rc != 0) {
-                fprintf (stderr, "obdio_ioctl: obd_ioctl_pack: %d (%s)\n", 
+                fprintf (stderr, "obdio_ioctl: obd_ioctl_pack: %d (%s)\n",
                          rc, strerror (errno));
                 abort ();
         }
-        
+
         rc = ioctl (conn->oc_fd, cmd, buf);
         if (rc != 0)
                 return (rc);
-        
+
         rc2 = obd_ioctl_unpack (&conn->oc_data, buf, sizeof (conn->oc_buffer));
         if (rc2 != 0) {
                 fprintf (stderr, "obdio_ioctl: obd_ioctl_unpack: %d (%s)\n",
                          rc2, strerror (errno));
                 abort ();
         }
-        
+
         return (rc);
 }
 
@@ -83,9 +83,9 @@ obdio_connect (int device)
                 return (NULL);
         }
         memset (conn, 0, sizeof (*conn));
-        
-       conn->oc_fd = open ("/dev/obd", O_RDWR);
-       if (conn->oc_fd < 0) {
+
+        conn->oc_fd = open ("/dev/obd", O_RDWR);
+        if (conn->oc_fd < 0) {
                 fprintf (stderr, "obdio_connect: Can't open /dev/obd: %s\n",
                          strerror (errno));
                 goto failed;
@@ -99,7 +99,7 @@ obdio_connect (int device)
                          device, strerror (errno));
                 goto failed;
         }
-        
+
         obdio_iocinit (conn);
         rc = obdio_ioctl (conn, OBD_IOC_CONNECT);
         if (rc != 0) {
@@ -107,18 +107,18 @@ obdio_connect (int device)
                          device, strerror (errno));
                 goto failed;
         }
-        
+
         conn->oc_conn_addr = conn->oc_data.ioc_addr;
         conn->oc_conn_cookie = conn->oc_data.ioc_cookie;
         return (conn);
-        
+
  failed:
         free (conn);
         return (NULL);
 }
 
 void
-obdio_disconnect (struct obdio_conn *conn) 
+obdio_disconnect (struct obdio_conn *conn)
 {
         close (conn->oc_fd);
         /* obdclass will automatically close on last ref */
@@ -126,18 +126,18 @@ obdio_disconnect (struct obdio_conn *conn)
 }
 
 int
-obdio_open (struct obdio_conn *conn, uint64_t oid, struct lustre_handle *fh) 
+obdio_open (struct obdio_conn *conn, uint64_t oid, struct lustre_handle *fh)
 {
         int    rc;
-        
+
         obdio_iocinit (conn);
-        
+
         conn->oc_data.ioc_obdo1.o_id = oid;
         conn->oc_data.ioc_obdo1.o_mode = S_IFREG;
         conn->oc_data.ioc_obdo1.o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLMODE;
-        
+
         rc = obdio_ioctl (conn, OBD_IOC_OPEN);
-        
+
         if (rc == 0)
                 memcpy (fh, obdo_handle(&conn->oc_data.ioc_obdo1), sizeof (*fh));
 
@@ -145,26 +145,26 @@ obdio_open (struct obdio_conn *conn, uint64_t oid, struct lustre_handle *fh)
 }
 
 int
-obdio_close (struct obdio_conn *conn, uint64_t oid, struct lustre_handle *fh) 
+obdio_close (struct obdio_conn *conn, uint64_t oid, struct lustre_handle *fh)
 {
         obdio_iocinit (conn);
-        
+
 
         conn->oc_data.ioc_obdo1.o_id = oid;
         conn->oc_data.ioc_obdo1.o_mode = S_IFREG;
         memcpy (obdo_handle (&conn->oc_data.ioc_obdo1), fh, sizeof (*fh));
-        conn->oc_data.ioc_obdo1.o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | 
+        conn->oc_data.ioc_obdo1.o_valid = OBD_MD_FLID | OBD_MD_FLTYPE |
                                           OBD_MD_FLMODE | OBD_MD_FLHANDLE;
-        
+
         return (obdio_ioctl (conn, OBD_IOC_CLOSE));
 }
 
 int
-obdio_pread (struct obdio_conn *conn, uint64_t oid, 
-             char *buffer, uint32_t count, uint64_t offset) 
+obdio_pread (struct obdio_conn *conn, uint64_t oid,
+             char *buffer, uint32_t count, uint64_t offset)
 {
         obdio_iocinit (conn);
-        
+
         conn->oc_data.ioc_obdo1.o_id = oid;
         conn->oc_data.ioc_obdo1.o_mode = S_IFREG;
         conn->oc_data.ioc_obdo1.o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLMODE;
@@ -178,11 +178,11 @@ obdio_pread (struct obdio_conn *conn, uint64_t oid,
 }
 
 int
-obdio_pwrite (struct obdio_conn *conn, uint64_t oid, 
-              char *buffer, uint32_t count, uint64_t offset) 
+obdio_pwrite (struct obdio_conn *conn, uint64_t oid,
+              char *buffer, uint32_t count, uint64_t offset)
 {
         obdio_iocinit (conn);
-        
+
         conn->oc_data.ioc_obdo1.o_id = oid;
         conn->oc_data.ioc_obdo1.o_mode = S_IFREG;
         conn->oc_data.ioc_obdo1.o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLMODE;
@@ -201,9 +201,9 @@ obdio_enqueue (struct obdio_conn *conn, uint64_t oid,
                struct lustre_handle *lh)
 {
         int   rc;
-        
+
         obdio_iocinit (conn);
-        
+
         conn->oc_data.ioc_obdo1.o_id = oid;
         conn->oc_data.ioc_obdo1.o_mode = S_IFREG;
         conn->oc_data.ioc_obdo1.o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLMODE;
@@ -211,12 +211,12 @@ obdio_enqueue (struct obdio_conn *conn, uint64_t oid,
         conn->oc_data.ioc_conn1 = mode;
         conn->oc_data.ioc_count = count;
         conn->oc_data.ioc_offset = offset;
-        
+
         rc = obdio_ioctl (conn, ECHO_IOC_ENQUEUE);
-        
+
         if (rc == 0)
                 memcpy (lh, obdo_handle (&conn->oc_data.ioc_obdo1), sizeof (*lh));
-        
+
         return (rc);
 }
 
@@ -227,40 +227,40 @@ obdio_cancel (struct obdio_conn *conn, struct lustre_handle *lh)
 
         memcpy (obdo_handle (&conn->oc_data.ioc_obdo1), lh, sizeof (*lh));
         conn->oc_data.ioc_obdo1.o_valid = OBD_MD_FLHANDLE;
-        
+
         return (obdio_ioctl (conn, ECHO_IOC_CANCEL));
 }
 
 void *
-obdio_alloc_aligned_buffer (void **spacep, int size) 
+obdio_alloc_aligned_buffer (void **spacep, int size)
 {
         int   pagesize = getpagesize();
         void *space = malloc (size + pagesize - 1);
-        
+
         *spacep = space;
         if (space == NULL)
                 return (NULL);
-        
+
         return ((void *)(((unsigned long)space + pagesize - 1) & ~(pagesize - 1)));
 }
 
 struct obdio_barrier *
-obdio_new_barrier (uint64_t oid, uint64_t id, int npeers) 
+obdio_new_barrier (uint64_t oid, uint64_t id, int npeers)
 {
-       struct obdio_barrier *b;
-
-       b = (struct obdio_barrier *)malloc (sizeof (*b));
-       if (b == NULL) {
-               fprintf (stderr, "obdio_new_barrier "LPX64": Can't allocate\n", oid);
-               return (NULL);
-       }
-       
-       b->ob_id = id;
-       b->ob_oid = oid;
-       b->ob_npeers = npeers;
-       b->ob_ordinal = 0;
-       b->ob_count = 0;
-       return (b);
+        struct obdio_barrier *b;
+
+        b = (struct obdio_barrier *)malloc (sizeof (*b));
+        if (b == NULL) {
+                fprintf (stderr, "obdio_new_barrier "LPX64": Can't allocate\n", oid);
+                return (NULL);
+        }
+
+        b->ob_id = id;
+        b->ob_oid = oid;
+        b->ob_npeers = npeers;
+        b->ob_ordinal = 0;
+        b->ob_count = 0;
+        return (b);
 }
 
 int
@@ -273,86 +273,86 @@ obdio_setup_barrier (struct obdio_conn *conn, struct obdio_barrier *b)
         void                   *space;
         struct obdio_barrier   *fileb;
 
-       if (b->ob_ordinal != 0 ||
-           b->ob_count != 0) {
-               fprintf (stderr, "obdio_setup_barrier: invalid parameter\n");
-               abort ();
-       }
-       
+        if (b->ob_ordinal != 0 ||
+            b->ob_count != 0) {
+                fprintf (stderr, "obdio_setup_barrier: invalid parameter\n");
+                abort ();
+        }
+
         rc = obdio_open (conn, b->ob_oid, &fh);
         if (rc != 0) {
                 fprintf (stderr, "obdio_setup_barrier "LPX64": Failed to open object: %s\n",
                          b->ob_oid, strerror (errno));
                 return (rc);
         }
-        
+
         fileb = (struct obdio_barrier *) obdio_alloc_aligned_buffer (&space, getpagesize ());
         if (fileb == NULL) {
                 fprintf (stderr, "obdio_setup_barrier "LPX64": Can't allocate page buffer\n",
-                        b->ob_oid);
+                         b->ob_oid);
                 rc = -1;
                 goto out_0;
         }
-        
+
         memset (fileb, 0, getpagesize ());
-       *fileb = *b;
-        
+        *fileb = *b;
+
         rc = obdio_enqueue (conn, b->ob_oid, LCK_PW, 0, getpagesize (), &lh);
         if (rc != 0) {
                 fprintf (stderr, "obdio_setup_barrier "LPX64": Error on enqueue: %s\n",
                          b->ob_oid, strerror (errno));
                 goto out_1;
         }
-        
+
         rc = obdio_pwrite (conn, b->ob_oid, (void *)fileb, getpagesize (), 0);
-       if (rc != 0)
-               fprintf (stderr, "obdio_setup_barrier "LPX64": Error on write: %s\n",
-                        b->ob_oid, strerror (errno));
-       
-       rc2 = obdio_cancel (conn, &lh);
-       if (rc == 0 && rc2 != 0) {
-               fprintf (stderr, "obdio_setup_barrier "LPX64": Error on cancel: %s\n",
-                        b->ob_oid, strerror (errno));
-               rc = rc2;
-       }
+        if (rc != 0)
+                fprintf (stderr, "obdio_setup_barrier "LPX64": Error on write: %s\n",
+                         b->ob_oid, strerror (errno));
+
+        rc2 = obdio_cancel (conn, &lh);
+        if (rc == 0 && rc2 != 0) {
+                fprintf (stderr, "obdio_setup_barrier "LPX64": Error on cancel: %s\n",
+                         b->ob_oid, strerror (errno));
+                rc = rc2;
+        }
  out_1:
-       free (space);
+        free (space);
  out_0:
-       rc2 = obdio_close (conn, b->ob_oid, &fh);
-       if (rc == 0 && rc2 != 0) {
-               fprintf (stderr, "obdio_setup_barrier "LPX64": Error on close: %s\n",
-                        b->ob_oid, strerror (errno));
-               rc = rc2;
-       }
-       
-       return (rc);
+        rc2 = obdio_close (conn, b->ob_oid, &fh);
+        if (rc == 0 && rc2 != 0) {
+                fprintf (stderr, "obdio_setup_barrier "LPX64": Error on close: %s\n",
+                         b->ob_oid, strerror (errno));
+                rc = rc2;
+        }
+
+        return (rc);
 }
 
 int
 obdio_barrier (struct obdio_conn *conn, struct obdio_barrier *b)
 {
         struct lustre_handle   fh;
-       struct lustre_handle   lh;
-       int                    rc;
-       int                    rc2;
+        struct lustre_handle   lh;
+        int                    rc;
+        int                    rc2;
         void                  *space;
         struct obdio_barrier  *fileb;
-       char                  *mode;
-
-       rc = obdio_open (conn, b->ob_oid, &fh);
-       if (rc != 0) {
-               fprintf (stderr, "obdio_barrier "LPX64": Error on open: %s\n",
-                        b->ob_oid, strerror (errno));
-               return (rc);
-       }
-       
+        char                  *mode;
+
+        rc = obdio_open (conn, b->ob_oid, &fh);
+        if (rc != 0) {
+                fprintf (stderr, "obdio_barrier "LPX64": Error on open: %s\n",
+                         b->ob_oid, strerror (errno));
+                return (rc);
+        }
+
         fileb = (struct obdio_barrier *) obdio_alloc_aligned_buffer (&space, getpagesize ());
-       if (fileb == NULL) {
-               fprintf (stderr, "obdio_barrier "LPX64": Can't allocate page buffer\n",
-                        b->ob_oid);
-               rc = -1;
-               goto out_0;
-       }
+        if (fileb == NULL) {
+                fprintf (stderr, "obdio_barrier "LPX64": Can't allocate page buffer\n",
+                         b->ob_oid);
+                rc = -1;
+                goto out_0;
+        }
 
         rc = obdio_enqueue (conn, b->ob_oid, LCK_PW, 0, getpagesize (), &lh);
         if (rc != 0) {
@@ -360,107 +360,107 @@ obdio_barrier (struct obdio_conn *conn, struct obdio_barrier *b)
                          b->ob_oid, strerror (errno));
                 goto out_1;
         }
-       
-       memset (fileb, 0xeb, getpagesize ());
-       rc = obdio_pread (conn, b->ob_oid, (void *)fileb, getpagesize (), 0);
-       if (rc != 0) {
-               fprintf (stderr, "obdio_barrier "LPX64": Error on initial read: %s\n",
-                        b->ob_oid, strerror (errno));
-               goto out_2;
-       }
-       
-       if (fileb->ob_id != b->ob_id ||
-           fileb->ob_oid != b->ob_oid ||
-           fileb->ob_npeers != b->ob_npeers ||
-           fileb->ob_count >= b->ob_npeers ||
-           fileb->ob_ordinal != b->ob_ordinal) {
-               fprintf (stderr, "obdio_barrier "LPX64": corrupt on initial read\n", b->ob_id);
-               fprintf (stderr, "  got ["LPX64","LPX64","LPX64","LPX64","LPX64"]\n",
-                        fileb->ob_id, fileb->ob_oid, fileb->ob_npeers, 
-                        fileb->ob_ordinal, fileb->ob_count);
-               fprintf (stderr, "  expected ["LPX64","LPX64","LPX64","LPX64","LPX64"]\n",
-                        b->ob_id, b->ob_oid, b->ob_npeers, 
-                        b->ob_ordinal, b->ob_count);
-               rc = -1;
-               goto out_2;
-       }
-       
-       fileb->ob_count++;
-       if (fileb->ob_count == fileb->ob_npeers) { /* I'm the last joiner */
-               fileb->ob_count = 0;            /* join count for next barrier */
-               fileb->ob_ordinal++;            /* signal all joined */
-       }
-
-       rc = obdio_pwrite (conn, b->ob_oid, (void *)fileb, getpagesize (), 0);
-       if (rc != 0) {
-               fprintf (stderr, "obdio_barrier "LPX64": Error on initial write: %s\n",
-                        b->ob_oid, strerror (errno));
-               goto out_2;
-       }
-
-       mode = "PW";
-       b->ob_ordinal++;                        /* now I wait... */
-       while (fileb->ob_ordinal != b->ob_ordinal) {
-
-               rc = obdio_cancel (conn, &lh);
-               if (rc != 0) {
-                       fprintf (stderr, "obdio_barrier "LPX64": Error on %s cancel: %s\n",
-                                b->ob_oid, mode, strerror (errno));
-                       goto out_1;
-               }
-
-               mode = "PR";
-               rc = obdio_enqueue (conn, b->ob_oid, LCK_PR, 0, getpagesize (), &lh);
-               if (rc != 0) {
-                       fprintf (stderr, "obdio_barrier "LPX64": Error on PR enqueue: %s\n",
-                                b->ob_oid, strerror (errno));
-                       goto out_1;
-               }
-               
-               memset (fileb, 0xeb, getpagesize ());
-               rc = obdio_pread (conn, b->ob_oid, (void *)fileb, getpagesize (), 0);
-               if (rc != 0) {
-                       fprintf (stderr, "obdio_barrier "LPX64": Error on read: %s\n",
-                                b->ob_oid, strerror (errno));
-                       goto out_2;
-               }
-               
-               if (fileb->ob_id != b->ob_id ||
-                   fileb->ob_oid != b->ob_oid ||
-                   fileb->ob_npeers != b->ob_npeers ||
-                   fileb->ob_count >= b->ob_npeers ||
-                   (fileb->ob_ordinal != b->ob_ordinal - 1 &&
-                    fileb->ob_ordinal != b->ob_ordinal)) {
-                       fprintf (stderr, "obdio_barrier "LPX64": corrupt\n", b->ob_id);
-                       fprintf (stderr, "  got ["LPX64","LPX64","LPX64","LPX64","LPX64"]\n",
-                                fileb->ob_id, fileb->ob_oid, fileb->ob_npeers, 
-                                fileb->ob_ordinal, fileb->ob_count);
-                       fprintf (stderr, "  expected ["LPX64","LPX64","LPX64","LPX64","LPX64"]\n",
-                                b->ob_id, b->ob_oid, b->ob_npeers, 
-                                b->ob_ordinal, b->ob_count);
-                       rc = -1;
-                       goto out_2;
-               }
-       }
-                       
+
+        memset (fileb, 0xeb, getpagesize ());
+        rc = obdio_pread (conn, b->ob_oid, (void *)fileb, getpagesize (), 0);
+        if (rc != 0) {
+                fprintf (stderr, "obdio_barrier "LPX64": Error on initial read: %s\n",
+                         b->ob_oid, strerror (errno));
+                goto out_2;
+        }
+
+        if (fileb->ob_id != b->ob_id ||
+            fileb->ob_oid != b->ob_oid ||
+            fileb->ob_npeers != b->ob_npeers ||
+            fileb->ob_count >= b->ob_npeers ||
+            fileb->ob_ordinal != b->ob_ordinal) {
+                fprintf (stderr, "obdio_barrier "LPX64": corrupt on initial read\n", b->ob_id);
+                fprintf (stderr, "  got ["LPX64","LPX64","LPX64","LPX64","LPX64"]\n",
+                         fileb->ob_id, fileb->ob_oid, fileb->ob_npeers,
+                         fileb->ob_ordinal, fileb->ob_count);
+                fprintf (stderr, "  expected ["LPX64","LPX64","LPX64","LPX64","LPX64"]\n",
+                         b->ob_id, b->ob_oid, b->ob_npeers,
+                         b->ob_ordinal, b->ob_count);
+                rc = -1;
+                goto out_2;
+        }
+
+        fileb->ob_count++;
+        if (fileb->ob_count == fileb->ob_npeers) { /* I'm the last joiner */
+                fileb->ob_count = 0;       /* join count for next barrier */
+                fileb->ob_ordinal++;                 /* signal all joined */
+        }
+
+        rc = obdio_pwrite (conn, b->ob_oid, (void *)fileb, getpagesize (), 0);
+        if (rc != 0) {
+                fprintf (stderr, "obdio_barrier "LPX64": Error on initial write: %s\n",
+                         b->ob_oid, strerror (errno));
+                goto out_2;
+        }
+
+        mode = "PW";
+        b->ob_ordinal++;           /* now I wait... */
+        while (fileb->ob_ordinal != b->ob_ordinal) {
+
+                rc = obdio_cancel (conn, &lh);
+                if (rc != 0) {
+                        fprintf (stderr, "obdio_barrier "LPX64": Error on %s cancel: %s\n",
+                                 b->ob_oid, mode, strerror (errno));
+                        goto out_1;
+                }
+
+                mode = "PR";
+                rc = obdio_enqueue (conn, b->ob_oid, LCK_PR, 0, getpagesize (), &lh);
+                if (rc != 0) {
+                        fprintf (stderr, "obdio_barrier "LPX64": Error on PR enqueue: %s\n",
+                                 b->ob_oid, strerror (errno));
+                        goto out_1;
+                }
+
+                memset (fileb, 0xeb, getpagesize ());
+                rc = obdio_pread (conn, b->ob_oid, (void *)fileb, getpagesize (), 0);
+                if (rc != 0) {
+                        fprintf (stderr, "obdio_barrier "LPX64": Error on read: %s\n",
+                                 b->ob_oid, strerror (errno));
+                        goto out_2;
+                }
+
+                if (fileb->ob_id != b->ob_id ||
+                    fileb->ob_oid != b->ob_oid ||
+                    fileb->ob_npeers != b->ob_npeers ||
+                    fileb->ob_count >= b->ob_npeers ||
+                    (fileb->ob_ordinal != b->ob_ordinal - 1 &&
+                     fileb->ob_ordinal != b->ob_ordinal)) {
+                        fprintf (stderr, "obdio_barrier "LPX