-TBD
- * version v0_5_21
+2003-03-11 Phil Schwan <phil@clusterfs.com>
+ * version v0_6
* bug fixes
- LDLM_DEBUG macro fix, for gcc 3.2 (850)
- failed open()s could cause deadlock; fixed (867, 869)
- if a bad lock AST arrives, send an error instead of dropping entirely
- return 0 from revalidate2 if ll_intent_lock returns -EINTR (912)
- fix leak in bulk IO when only partially completed (899, 900, 926)
+ - fix O_DIRECT for ia64 (55)
+ - (almost) eliminate Lustre-kernel-thread effects on load average (722)
+ - C-z after timeout could hang a process forever; fixed (977)
+ * Features
+ - client-side I/O cache (678, 924, 929, 941, 970)
* protocol changes
- READPAGE and SETATTRs which don't take server-side locks get
their own portal
SUBDIRS+= llite obdecho lov cobd tests doc scripts conf
endif
-DIST_SUBDIRS = $(SUBDIRS)
+DIST_SUBDIRS = $(SUBDIRS) liblustre
EXTRA_DIST = BUGS FDL Rules include archdep.m4 kernel_patches
# We get the version from the spec file.
#######################################################################
# lustre ldap config database
-# $Id: slapd-lustre.conf,v 1.2 2003/01/06 22:17:53 adilger Exp $
+# $Id: slapd-lustre.conf,v 1.3 2003/03/11 23:36:45 pschwan Exp $
#######################################################################
database ldbm
AC_SUBST(LIBREADLINE)
AC_SUBST(HAVE_LIBREADLINE)
+AC_ARG_ENABLE(efence, [ --enable-efence use efence library],,
+ enable_efence="no")
+
+if test "$enable_efence" = "yes" ; then
+ LIBEFENCE="-lefence"
+ HAVE_LIBEFENCE="-DHAVE_LIBEFENCE=1"
+else
+ LIBEFENCE=""
+ HAVE_LIBEFENCE=""
+fi
+AC_SUBST(LIBEFENCE)
+AC_SUBST(HAVE_LIBEFENCE)
+
# XXX this should be a runtime option
+AC_MSG_CHECKING(if you are enabling OST recovery...)
AC_ARG_ENABLE(ost_recovery, [ --enable-ost-recovery: enable support for ost recovery],,
- enable_ost_recovery="yes")
+ enable_ost_recovery="no")
if test "$enable_ost_recovery" = "yes" ; then
ENABLE_OST_RECOVERY="-DOST_RECOVERY=1"
+ AC_MSG_RESULT(yes)
else
- HAVE_LIBREADLINE=""
+ ENABLE_OST_RECOVERY=""
+ AC_MSG_RESULT(no)
fi
AC_SUBST(ENABLE_OST_RECOVERY)
else
KINCFLAGS='-I$(top_srcdir)/include -I$(PORTALS)/include'
fi
-CPPFLAGS="$KINCFLAGS $ARCHCPPFLAGS"
+CPPFLAGS="$KINCFLAGS $ARCHCPPFLAGS $ENABLE_OST_RECOVERY"
if test $host_cpu != "lib" ; then
AC_MSG_CHECKING(if make dep has been run in kernel source (host $host_cpu) )
dnl We need to rid ourselves of the nasty [ ] quotes.
changequote(, )
dnl Get release from version.h
-RELEASE="`sed -ne 's/.*UTS_RELEASE[ \"]*\([0-9.a-zA-Z-]*\).*/\1/p' $LINUX/include/linux/version.h`"
+RELEASE="`sed -ne 's/.*UTS_RELEASE[ \"]*\([0-9.a-zA-Z_-]*\).*/\1/p' $LINUX/include/linux/version.h`"
changequote([, ])
moduledir='$(libdir)/modules/'$RELEASE/kernel
--- /dev/null
+===== fs/ext3/ialloc.c 1.26 vs edited =====
+--- 1.26/fs/ext3/ialloc.c Fri Feb 14 19:24:09 2003
++++ edited/fs/ext3/ialloc.c Sat Mar 8 01:20:55 2003
+@@ -195,6 +195,36 @@
+ }
+
+ /*
++ * @block_group: block group of inode
++ * @offset: relative offset of inode within @block_group
++ *
++ * Check whether any of the inodes in this disk block are in use.
++ *
++ * Caller must be holding superblock lock (group/bitmap read lock in
++ * future).
++ */
++int ext3_itable_block_used(struct super_block *sb, unsigned int block_group,
++ int offset)
++{
++ struct buffer_head *ibitmap = read_inode_bitmap(sb, block_group);
++ int inodes_per_block;
++ unsigned long inum, iend;
++
++ if (!ibitmap)
++ return 1;
++
++ inodes_per_block = sb->s_blocksize / EXT3_SB(sb)->s_inode_size;
++ inum = offset & ~(inodes_per_block - 1);
++ iend = inum + inodes_per_block;
++ for (; inum < iend; inum++) {
++ if (inum != offset && ext3_test_bit(inum, ibitmap->b_data))
++ return 1;
++ }
++
++ return 0;
++}
++
++/*
+ * There are two policies for allocating an inode. If the new inode is
+ * a directory, then a forward search is made for a block group with both
+ * free space and a low directory-to-inode ratio; if that fails, then of
+@@ -422,8 +452,9 @@
+ struct ext3_group_desc * gdp;
+ struct ext3_super_block * es;
+ struct ext3_inode_info *ei;
+- int err = 0;
++ struct ext3_iloc iloc;
+ struct inode *ret;
++ int err = 0;
+
+ /* Cannot create files in a deleted directory */
+ if (!dir || !dir->i_nlink)
+@@ -587,16 +618,23 @@
+ goto fail2;
+ }
+ err = ext3_init_acl(handle, inode, dir);
++ if (err)
++ goto fail3;
++
++ err = ext3_get_inode_loc_new(inode, &iloc, 1);
++ if (err)
++ goto fail3;
++
++ BUFFER_TRACE(iloc->bh, "get_write_access");
++ err = ext3_journal_get_write_access(handle, iloc.bh);
+ if (err) {
+- DQUOT_FREE_INODE(inode);
+- goto fail2;
+- }
+- err = ext3_mark_inode_dirty(handle, inode);
+- if (err) {
+- ext3_std_error(sb, err);
+- DQUOT_FREE_INODE(inode);
+- goto fail2;
+- }
++ brelse(iloc.bh);
++ iloc.bh = NULL;
++ goto fail3;
++ }
++ err = ext3_mark_iloc_dirty(handle, inode, &iloc);
++ if (err)
++ goto fail3;
+
+ ext3_debug("allocating inode %lu\n", inode->i_ino);
+ goto really_out;
+@@ -610,6 +648,9 @@
+ brelse(bitmap_bh);
+ return ret;
+
++fail3:
++ ext3_std_error(sb, err);
++ DQUOT_FREE_INODE(inode);
+ fail2:
+ inode->i_flags |= S_NOQUOTA;
+ inode->i_nlink = 0;
+===== fs/ext3/inode.c 1.62 vs edited =====
+--- 1.62/fs/ext3/inode.c Fri Feb 14 19:24:09 2003
++++ edited/fs/ext3/inode.c Sat Mar 8 02:10:39 2003
+@@ -2144,69 +2144,118 @@
+ unlock_kernel();
+ }
+
+-/*
+- * ext3_get_inode_loc returns with an extra refcount against the
+- * inode's underlying buffer_head on success.
+- */
++#define NUM_INODE_PREREAD 16
+
+-int ext3_get_inode_loc (struct inode *inode, struct ext3_iloc *iloc)
++/*
++ * ext3_get_inode_loc returns with an extra refcount against the inode's
++ * underlying buffer_head on success. If this is for a new inode allocation
++ * (new is non-zero) then we may be able to optimize away the read if there
++ * are no other in-use inodes in this inode table block. If we need to do
++ * a read, then read in a whole chunk of blocks to avoid blocking again soon
++ * if we are doing lots of creates/updates.
++ */
++int ext3_get_inode_loc_new(struct inode *inode, struct ext3_iloc *iloc, int new)
+ {
+- struct buffer_head *bh = 0;
++ struct buffer_head *bh[NUM_INODE_PREREAD];
++ struct super_block *sb = inode->i_sb;
++ struct ext3_sb_info *sbi = EXT3_SB(sb);
++ unsigned long ino = inode->i_ino;
+ unsigned long block;
+ unsigned long block_group;
+ unsigned long group_desc;
+ unsigned long desc;
+ unsigned long offset;
+ struct ext3_group_desc * gdp;
+-
+- if ((inode->i_ino != EXT3_ROOT_INO &&
+- inode->i_ino != EXT3_JOURNAL_INO &&
+- inode->i_ino < EXT3_FIRST_INO(inode->i_sb)) ||
+- inode->i_ino > le32_to_cpu(
+- EXT3_SB(inode->i_sb)->s_es->s_inodes_count)) {
+- ext3_error (inode->i_sb, "ext3_get_inode_loc",
+- "bad inode number: %lu", inode->i_ino);
++
++ if ((ino != EXT3_ROOT_INO && ino != EXT3_JOURNAL_INO &&
++ ino < EXT3_FIRST_INO(sb)) ||
++ ino > le32_to_cpu(sbi->s_es->s_inodes_count)) {
++ ext3_error(sb, "ext3_get_inode_loc", "bad inode number: %lu",
++ ino);
+ goto bad_inode;
+ }
+- block_group = (inode->i_ino - 1) / EXT3_INODES_PER_GROUP(inode->i_sb);
+- if (block_group >= EXT3_SB(inode->i_sb)->s_groups_count) {
+- ext3_error (inode->i_sb, "ext3_get_inode_loc",
+- "group >= groups count");
++ block_group = (ino - 1) / EXT3_INODES_PER_GROUP(sb);
++ if (block_group >= EXT3_SB(sb)->s_groups_count) {
++ ext3_error(sb, "ext3_get_inode_loc", "group >= groups count");
+ goto bad_inode;
+ }
+- group_desc = block_group >> EXT3_DESC_PER_BLOCK_BITS(inode->i_sb);
+- desc = block_group & (EXT3_DESC_PER_BLOCK(inode->i_sb) - 1);
+- bh = EXT3_SB(inode->i_sb)->s_group_desc[group_desc];
+- if (!bh) {
+- ext3_error (inode->i_sb, "ext3_get_inode_loc",
+- "Descriptor not loaded");
++ group_desc = block_group >> EXT3_DESC_PER_BLOCK_BITS(sb);
++ desc = block_group & (EXT3_DESC_PER_BLOCK(sb) - 1);
++ if (!sbi->s_group_desc[group_desc]) {
++ ext3_error(sb, "ext3_get_inode_loc", "Descriptor not loaded");
+ goto bad_inode;
+ }
+
+- gdp = (struct ext3_group_desc *) bh->b_data;
++ gdp = (struct ext3_group_desc *)(sbi->s_group_desc[group_desc]->b_data);
+ /*
+ * Figure out the offset within the block group inode table
+ */
+- offset = ((inode->i_ino - 1) % EXT3_INODES_PER_GROUP(inode->i_sb)) *
+- EXT3_INODE_SIZE(inode->i_sb);
++ offset = ((ino - 1) % EXT3_INODES_PER_GROUP(sb));
+ block = le32_to_cpu(gdp[desc].bg_inode_table) +
+- (offset >> EXT3_BLOCK_SIZE_BITS(inode->i_sb));
+- if (!(bh = sb_bread(inode->i_sb, block))) {
+- ext3_error (inode->i_sb, "ext3_get_inode_loc",
+- "unable to read inode block - "
+- "inode=%lu, block=%lu", inode->i_ino, block);
+- goto bad_inode;
++ (offset * sbi->s_inode_size >> EXT3_BLOCK_SIZE_BITS(sb));
++ bh[0] = sb_getblk(sb, block);
++ if (buffer_uptodate(bh[0]))
++ goto done;
++
++ /* If we don't really need to read this block, and it isn't already
++ * in memory, then we just zero it out. Otherwise, we keep the
++ * current block contents (deleted inode data) for posterity.
++ */
++ if (new && !ext3_itable_block_used(sb, block_group, offset)) {
++ lock_buffer(bh[0]);
++ memset(bh[0]->b_data, 0, bh[0]->b_size);
++ set_buffer_uptodate(bh[0]);
++ unlock_buffer(bh[0]);
++ } else {
++ unsigned long block_end, itable_end;
++ int count = 1;
++
++ itable_end = le32_to_cpu(gdp[desc].bg_inode_table) +
++ sbi->s_itb_per_group;
++ block_end = block + NUM_INODE_PREREAD;
++ if (block_end > itable_end)
++ block_end = itable_end;
++
++ for (; block < block_end; block++) {
++ bh[count] = sb_getblk(sb, block);
++ if (count && (buffer_uptodate(bh[count]) ||
++ buffer_locked(bh[count]))) {
++ __brelse(bh[count]);
++ } else
++ count++;
++ }
++
++ ll_rw_block(READ, count, bh);
++
++ /* Release all but the block we actually need (bh[0]) */
++ while (--count > 0)
++ __brelse(bh[count]);
++
++ wait_on_buffer(bh[0]);
++ if (!buffer_uptodate(bh[0])) {
++ ext3_error(sb, __FUNCTION__,
++ "unable to read inode block - "
++ "inode=%lu, block=%llu", ino,
++ (unsigned long long)bh[0]->b_blocknr);
++ goto bad_inode;
++ }
+ }
+- offset &= (EXT3_BLOCK_SIZE(inode->i_sb) - 1);
++done:
++ offset = (offset * sbi->s_inode_size) & (EXT3_BLOCK_SIZE(sb) - 1);
+
+- iloc->bh = bh;
+- iloc->raw_inode = (struct ext3_inode *) (bh->b_data + offset);
++ iloc->bh = bh[0];
++ iloc->raw_inode = (struct ext3_inode *)(bh[0]->b_data + offset);
+ iloc->block_group = block_group;
+-
++
+ return 0;
+-
++
+ bad_inode:
+ return -EIO;
++}
++
++int ext3_get_inode_loc(struct inode *inode, struct ext3_iloc *iloc)
++{
++ return ext3_get_inode_loc_new(inode, iloc, 0);
+ }
+
+ void ext3_read_inode(struct inode * inode)
+===== include/linux/ext3_fs.h 1.22 vs edited =====
+--- 1.22/include/linux/ext3_fs.h Tue Jan 14 00:56:29 2003
++++ edited/include/linux/ext3_fs.h Sat Mar 8 01:56:28 2003
+@@ -719,6 +719,8 @@
+ extern struct buffer_head * ext3_getblk (handle_t *, struct inode *, long, int, int *);
+ extern struct buffer_head * ext3_bread (handle_t *, struct inode *, int, int, int *);
+
++extern int ext3_itable_block_used(struct super_block *, unsigned int, int);
++extern int ext3_get_inode_loc_new(struct inode *, struct ext3_iloc *, int);
+ extern int ext3_get_inode_loc (struct inode *, struct ext3_iloc *);
+ extern void ext3_read_inode (struct inode *);
+ extern void ext3_write_inode (struct inode *, int);
diff -ru lustre-head/fs/extN/inode.c lustre/fs/extN/inode.c
--- lustre-head/fs/extN/inode.c Mon Dec 23 10:02:58 2002
+++ lustre/fs/extN/inode.c Mon Dec 23 09:50:25 2002
-@@ -2011,23 +1994,32 @@
+@@ -2011,23 +1994,28 @@
extN_journal_stop(handle, inode);
}
- * extN_get_inode_loc returns with an extra refcount against the
- * inode's underlying buffer_head on success.
- */
-+extern int extN_itable_block_used(struct super_block *sb,
-+ unsigned int block_group,
-+ int offset);
-+
-+#define NUM_INODE_PREREAD 16
++#define NUM_INODE_PREREAD 16
-int extN_get_inode_loc (struct inode *inode, struct extN_iloc *iloc)
+/*
void extN_read_inode(struct inode * inode)
{
struct extN_iloc iloc;
+diff -ru include/linux/extN_fs.h.orig include/linux/extN_fs.h
+--- lustre/include/linux/extN_fs.h.orig Sat Mar 8 01:23:09 2003
++++ lustre/include/linux/extN_fs.h Sat Mar 8 01:24:31 2003
+@@ -642,6 +646,8 @@
+ extern struct buffer_head * extN_getblk (handle_t *, struct inode *, long, int, int *);
+ extern struct buffer_head * extN_bread (handle_t *, struct inode *, int, int, int *);
+
++extern int extN_itable_block_used(struct super_block *sb, unsigned int, int);
++extern int extN_get_inode_loc_new(struct inode *, struct extN_iloc *, int);
+ extern int extN_get_inode_loc (struct inode *, struct extN_iloc *);
+ extern void extN_read_inode (struct inode *);
+ extern void extN_write_inode (struct inode *, int);
#define kfree(a) free(a)
#define GFP_KERNEL 1
#define GFP_HIGHUSER 1
-#define IS_ERR(a) (abs((int)(a)) < 500 ? 1 : 0)
+#define IS_ERR(a) (((a) && abs((int)(a)) < 500) ? 1 : 0)
#define PTR_ERR(a) ((int)(a))
#define capable(foo) 1
if (!pg)
return NULL;
#ifdef MAP_ANONYMOUS
- pg->addr = mmap(0, PAGE_SIZE, PROT_WRITE, MAP_ANONYMOUS, 0, 0);
+ pg->addr = mmap(0, PAGE_SIZE, PROT_WRITE, MAP_PRIVATE|MAP_ANONYMOUS, 0, 0);
#else
pg->addr = malloc(PAGE_SIZE);
#endif
#define init_waitqueue_head(l) INIT_LIST_HEAD(&(l)->sleepers)
#define wake_up(l) do { int a; a++; } while (0)
-#define wait_event(l,m) do { int a; a++; } while (0)
#define TASK_INTERRUPTIBLE 0
#define TASK_UNINTERRUPTIBLE 1
#define TASK_RUNNING 2
extern struct proc_dir_entry *proc_lustre_root;
+#ifdef LPROCFS
#define LPROCFS_INIT_MULTI_VARS(array, size) \
void lprocfs_init_multi_vars(unsigned int idx, \
struct lprocfs_static_vars *x) \
x->obd_vars = glob[idx].obd_vars; \
} \
-#ifdef LPROCFS
#define LPROCFS_INIT_VARS(vclass, vinstance) \
void lprocfs_init_vars(struct lprocfs_static_vars *x) \
{ \
static inline struct proc_dir_entry *
lprocfs_register(const char *name, struct proc_dir_entry *parent,
struct lprocfs_vars *list, void *data) { return NULL; }
+#define LPROCFS_INIT_MULTI_VARS(array, size)
+static inline void lprocfs_init_multi_vars(unsigned int idx,
+ struct lprocfs_static_vars *x) { return; }
+#define LPROCFS_INIT_VARS(vclass, vinstance)
static inline void lprocfs_init_vars(struct lprocfs_static_vars *x) { return; }
static inline int lprocfs_add_vars(struct proc_dir_entry *root,
struct lprocfs_vars *var,
ELDLM_LOCK_CHANGED = 300,
ELDLM_LOCK_ABORTED = 301,
ELDLM_LOCK_REPLACED = 302,
+ ELDLM_LOCK_MATCHED = 303,
ELDLM_NAMESPACE_EXISTS = 400,
- ELDLM_BAD_NAMESPACE = 401
+ ELDLM_BAD_NAMESPACE = 401,
+ ELDLM_GETATTR_ERROR = 402
} ldlm_error_t;
#define LDLM_NAMESPACE_SERVER 0
*
*/
-#define RES_HASH_BITS 14
+#define RES_HASH_BITS 10
#define RES_HASH_SIZE (1UL << RES_HASH_BITS)
#define RES_HASH_MASK (RES_HASH_SIZE - 1)
void ldlm_cancel_callback(struct ldlm_lock *);
int ldlm_lock_set_data(struct lustre_handle *, void *data, void *cp_data);
void ldlm_lock_remove_from_lru(struct ldlm_lock *);
+struct ldlm_lock *ldlm_handle2lock_ns(struct ldlm_namespace *,
+ struct lustre_handle *);
static inline struct ldlm_lock *ldlm_handle2lock(struct lustre_handle *h)
{
#define OBD_BRW_WRITE 0x2
#define OBD_BRW_RWMASK (OBD_BRW_READ | OBD_BRW_WRITE)
#define OBD_BRW_CREATE 0x4
+#define OBD_BRW_SYNC 0x8
#define OBD_OBJECT_EOF 0xffffffffffffffffULL
obd_gr ioo_gr;
__u32 ioo_type;
__u32 ioo_bufcnt;
-};
+} __attribute__((packed));
struct niobuf_remote {
__u64 offset;
# include <string.h>
#else
# include <asm/semaphore.h>
+# include <linux/sched.h>
+# include <linux/signal.h>
#endif
#include <linux/types.h>
#include <linux/portals_lib.h>
struct list_head brw_desc_head; /* list of ptlrpc_bulk_desc */
wait_queue_head_t brw_waitq;
atomic_t brw_refcount;
+ atomic_t brw_desc_count;
int brw_flags;
int (*brw_callback)(struct obd_brw_set *, int phase);
lwi_cb_data: data \
})
+#define LUSTRE_FATAL_SIGS (sigmask(SIGKILL) | sigmask(SIGINT) | \
+ sigmask(SIGTERM) | sigmask(SIGQUIT))
+
#ifdef __KERNEL__
-#define l_sigismember sigismember
-#else
-#define l_sigismember(a,b) (*(a) & b)
-#endif
+static inline sigset_t l_w_e_set_sigs(int sigs)
+{
+ sigset_t old;
+ unsigned long irqflags;
-/* XXX this should be one mask-check */
-#define l_killable_pending(task) \
-(l_sigismember(&(task->pending.signal), SIGKILL) || \
- l_sigismember(&(task->pending.signal), SIGINT) || \
- l_sigismember(&(task->pending.signal), SIGTERM))
+ spin_lock_irqsave(¤t->sigmask_lock, irqflags);
+ old = current->blocked;
+ siginitsetinv(¤t->blocked, sigs);
+ recalc_sigpending(current);
+ spin_unlock_irqrestore(¤t->sigmask_lock, irqflags);
+
+ return old;
+}
#define __l_wait_event(wq, condition, info, ret) \
do { \
wait_queue_t __wait; \
- long __state; \
int __timed_out = 0; \
- init_waitqueue_entry(&__wait, current); \
+ unsigned long irqflags; \
+ sigset_t blocked; \
\
+ init_waitqueue_entry(&__wait, current); \
add_wait_queue(&wq, &__wait); \
+ \
+ /* Block all signals (just the non-fatal ones if no timeout). */ \
if (info->lwi_signals && !info->lwi_timeout) \
- __state = TASK_INTERRUPTIBLE; \
+ blocked = l_w_e_set_sigs(LUSTRE_FATAL_SIGS); \
else \
- __state = TASK_UNINTERRUPTIBLE; \
+ blocked = l_w_e_set_sigs(0); \
+ \
for (;;) { \
- set_current_state(__state); \
+ set_current_state(TASK_INTERRUPTIBLE); \
if (condition) \
break; \
- if (__state == TASK_INTERRUPTIBLE && l_killable_pending(current)) {\
+ if (signal_pending(current)) { \
if (info->lwi_on_signal) \
info->lwi_on_signal(info->lwi_cb_data); \
ret = -EINTR; \
break; \
} \
/* We'll take signals after a timeout. */ \
- if (info->lwi_signals) { \
- __state = TASK_INTERRUPTIBLE; \
- /* Check for a pending interrupt. */ \
- if (info->lwi_signals && l_killable_pending(current)) {\
- if (info->lwi_on_signal) \
- info->lwi_on_signal(info->lwi_cb_data); \
- ret = -EINTR; \
- break; \
- } \
- } \
+ if (info->lwi_signals) \
+ (void)l_w_e_set_sigs(LUSTRE_FATAL_SIGS); \
} \
} else { \
schedule(); \
} \
} \
+ \
+ spin_lock_irqsave(¤t->sigmask_lock, irqflags); \
+ current->blocked = blocked; \
+ recalc_sigpending(current); \
+ spin_unlock_irqrestore(¤t->sigmask_lock, irqflags); \
+ \
current->state = TASK_RUNNING; \
remove_wait_queue(&wq, &__wait); \
} while(0)
__l_wait_event(wq, condition, __info, __ret); \
__ret; \
})
+#endif /* __KERNEL__ */
#endif /* _LUSTRE_LIB_H */
struct lov_stripe_md *lli_smd;
char *lli_symlink_name;
struct semaphore lli_open_sem;
+ atomic_t lli_open_count; /* see ll_file_release */
+ /*
+ * the VALID flag and valid_sem are temporary measures to serialize
+ * the manual getattrs that we're doing at lock acquisition. in
+ * the future the OST will always return its notion of the file
+ * size with the granted locks.
+ */
+ unsigned long lli_flags;
+#define LLI_F_DID_GETATTR 0
+ struct semaphore lli_getattr_sem;
+ struct list_head lli_read_extents;
+ spinlock_t lli_read_extent_lock;
+
#if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
struct inode lli_vfs_inode;
#endif
};
+/*
+ * this lets ll_file_read tell ll_readpages how far ahead it can read
+ * and still be covered by ll_file_read's lock. 2.5 won't need this, but
+ * we have the other problem of other readpage callers making sure that
+ * they're covered by a lock..
+ */
+struct ll_read_extent {
+ struct list_head re_lli_item;
+ struct task_struct *re_task;
+ struct ldlm_extent re_extent;
+};
+
+int ll_check_dirty( struct super_block *sb );
+int ll_batch_writepage( struct inode *inode, struct page *page );
+
/* interpet return codes from intent lookup */
#define LL_LOOKUP_POSITIVE 1
#define LL_LOOKUP_NEGATIVE 2
struct ldlm_lock;
int ll_lock_callback(struct ldlm_lock *, struct ldlm_lock_desc *, void *data,
int flag);
-int ll_size_lock(struct inode *, struct lov_stripe_md *, obd_off start,
- int mode, struct lustre_handle *);
-int ll_size_unlock(struct inode *, struct lov_stripe_md *, int mode,
- struct lustre_handle *);
-int ll_file_size(struct inode *inode, struct lov_stripe_md *md, char *ostdata);
+int ll_extent_lock_no_validate(struct ll_file_data *fd, struct inode *inode,
+ struct lov_stripe_md *lsm, int mode,
+ struct ldlm_extent *extent, struct lustre_handle *lockh);
+int ll_extent_lock(struct ll_file_data *fd, struct inode *inode,
+ struct lov_stripe_md *lsm, int mode,
+ struct ldlm_extent *extent, struct lustre_handle *lockh);
+int ll_extent_unlock(struct ll_file_data *fd, struct inode *inode,
+ struct lov_stripe_md *lsm, int mode,
+ struct lustre_handle *lockh);
int ll_create_objects(struct super_block *sb, obd_id id, uid_t uid,
gid_t gid, struct lov_stripe_md **lsmp);
__u64 mcd_last_xid; /* xid for the last transaction */
__u32 mcd_last_result; /* result from last RPC */
__u32 mcd_last_data; /* per-op data (disposition for open &c.) */
- __u8 padding[MDS_LR_SIZE - 58];
+ __u8 padding[MDS_LR_SIZE - 74];
};
/* In-memory access to client data from MDS struct */
int rq_reqlen;
struct lustre_msg *rq_reqmsg;
+ int rq_timeout;
int rq_replen;
struct lustre_msg *rq_repmsg;
__u64 rq_transno;
struct obd_brw_set *obd_brw_set_new(void);
void obd_brw_set_add(struct obd_brw_set *, struct ptlrpc_bulk_desc *);
void obd_brw_set_del(struct ptlrpc_bulk_desc *);
-void obd_brw_set_free(struct obd_brw_set *);
+void obd_brw_set_decref(struct obd_brw_set *set);
+void obd_brw_set_addref(struct obd_brw_set *set);
int ptlrpc_reply(struct ptlrpc_service *svc, struct ptlrpc_request *req);
int ptlrpc_error(struct ptlrpc_service *svc, struct ptlrpc_request *req);
#include <linux/types.h>
#include <linux/fs.h>
#include <linux/time.h>
-#endif
+#endif
#include <linux/obd_support.h>
#include <linux/lustre_import.h>
dst->o_valid |= (valid & ~OBD_MD_FLID);
}
+static inline void obdo_refresh_inode(struct inode *dst, struct obdo *src,
+ obd_flag valid)
+{
+ valid &= src->o_valid;
+
+#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
+ if (valid & OBD_MD_FLATIME && src->o_atime > dst->i_atime)
+ dst->i_atime = src->o_atime;
+ if (valid & OBD_MD_FLMTIME && src->o_mtime > dst->i_mtime)
+ dst->i_mtime = src->o_mtime;
+ if (valid & OBD_MD_FLCTIME && src->o_ctime > dst->i_ctime)
+ dst->i_ctime = src->o_ctime;
+#else
+ if (valid & OBD_MD_FLATIME && src->o_atime > dst->i_atime.tv_sec)
+ dst->i_atime.tv_sec = src->o_atime;
+ if (valid & OBD_MD_FLMTIME && src->o_mtime > dst->i_mtime.tv_sec)
+ dst->i_mtime.tv_sec = src->o_mtime;
+ if (valid & OBD_MD_FLCTIME && src->o_ctime > dst->i_ctime.tv_sec)
+ dst->i_ctime.tv_sec = src->o_ctime;
+#endif
+ if (valid & OBD_MD_FLSIZE && src->o_size > dst->i_size)
+ dst->i_size = src->o_size;
+ /* allocation of space */
+ if (valid & OBD_MD_FLBLOCKS && src->o_blocks > dst->i_blocks)
+ dst->i_blocks = src->o_blocks;
+}
+
static inline void obdo_to_inode(struct inode *dst, struct obdo *src,
obd_flag valid)
{
#define LUSTRE_SANOST_NAME "sanost"
/* ost/ost_pack.c */
-void ost_pack_niobuf(void **tmp, __u64 offset, __u32 len, __u32 flags,
- __u32 xid);
-void ost_unpack_niobuf(void **tmp, struct niobuf_remote **nbp);
-void ost_pack_ioo(struct obd_ioobj **ioop, struct lov_stripe_md *oa,int bufcnt);
-void ost_unpack_ioo(struct obd_ioobj **tmp, struct obd_ioobj **ioop);
+void ost_pack_niobuf(struct niobuf_remote *nb, __u64 offset, __u32 len,
+ __u32 flags, __u32 xid);
+void ost_unpack_niobuf(struct niobuf_remote *dst, struct niobuf_remote *src);
+void ost_pack_ioo(struct obd_ioobj *ioo, struct lov_stripe_md *lsm, int bufcnt);
+void ost_unpack_ioo(struct obd_ioobj *dst, struct obd_ioobj *src);
#endif
extern void ptlbd_blk_register(struct ptlbd_obd *ptlbd);
extern int ptlbd_send_req(struct ptlbd_obd *, ptlbd_cmd_t cmd,
- struct buffer_head *);
+ struct request *);
extern int ptlbd_parse_req(struct ptlrpc_request *req);
#endif
((obd_fail_loc & (OBD_FAILED | OBD_FAIL_ONCE))!= \
(OBD_FAILED | OBD_FAIL_ONCE)))
-#define OBD_FAIL_RETURN(id, ret) \
-do { \
+#define OBD_FAIL_CHECK_ONCE(id) \
+({ int _ret_ = 0; \
if (OBD_FAIL_CHECK(id)) { \
- CERROR("obd_fail_loc=%x, fail operation rc=%d\n", id, ret); \
+ CERROR("obd_fail_loc=%x\n", id); \
obd_fail_loc |= OBD_FAILED; \
if ((id) & OBD_FAIL_ONCE) \
obd_fail_loc |= OBD_FAIL_ONCE; \
+ _ret_ = 1; \
+ } \
+ _ret_; \
+})
+
+#define OBD_FAIL_RETURN(id, ret) \
+do { \
+ if (OBD_FAIL_CHECK_ONCE(id)) { \
RETURN(ret); \
} \
} while(0)
--- /dev/null
+--- linux-2.4.19-hp2_pnnl4_Lv13/fs/inode.c.iod-export 2003-02-27 14:28:04.000000000 -0800
++++ linux-2.4.19-hp2_pnnl4_Lv13/fs/inode.c 2003-03-03 13:54:59.000000000 -0800
+@@ -5,6 +5,7 @@
+ */
+
+ #include <linux/config.h>
++#include <linux/module.h>
+ #include <linux/fs.h>
+ #include <linux/string.h>
+ #include <linux/mm.h>
+@@ -66,7 +67,8 @@
+ * NOTE! You also have to own the lock if you change
+ * the i_state of an inode while it is in use..
+ */
+-static spinlock_t inode_lock = SPIN_LOCK_UNLOCKED;
++spinlock_t inode_lock = SPIN_LOCK_UNLOCKED;
++EXPORT_SYMBOL(inode_lock);
+
+ /*
+ * Statistics gathering..
+--- linux-2.4.19-hp2_pnnl4_Lv13/fs/Makefile.iod-export 2003-02-27 14:28:01.000000000 -0800
++++ linux-2.4.19-hp2_pnnl4_Lv13/fs/Makefile 2003-03-03 13:56:11.000000000 -0800
+@@ -7,7 +7,7 @@
+
+ O_TARGET := fs.o
+
+-export-objs := filesystems.o open.o dcache.o buffer.o dquot.o
++export-objs := filesystems.o open.o dcache.o buffer.o dquot.o inode.o
+ mod-subdirs := nls xfs
+
+ obj-y := open.o read_write.o devices.o file_table.o buffer.o \
+--- linux-2.4.19-hp2_pnnl4_Lv13/mm/page_alloc.c.iod-export 2003-02-27 14:28:01.000000000 -0800
++++ linux-2.4.19-hp2_pnnl4_Lv13/mm/page_alloc.c 2003-03-03 13:54:59.000000000 -0800
+@@ -28,6 +28,7 @@
+ LIST_HEAD(inactive_list);
+ LIST_HEAD(active_list);
+ pg_data_t *pgdat_list;
++EXPORT_SYMBOL(pgdat_list);
+
+ /* Used to look up the address of the struct zone encoded in page->zone */
+ zone_t *zone_table[MAX_NR_ZONES*MAX_NR_NODES];
lustre_version.patch
vfs_intent_hp.patch
invalidate_show.patch
+iod-stock-24-exports_hp.patch
+++ /dev/null
-dev_read_only.patch
-exports.patch
-kmem_cache_validate.patch
-lustre_version.patch
-uml_check_get_page.patch
-uml_no_panic.patch
-vfs_intent.patch
-uml_compile_fixes.patch
-invalidate_show.patch
#define DEBUG_SUBSYSTEM S_LDLM
#ifdef __KERNEL__
-#include <linux/module.h>
-#include <linux/slab.h>
-#include <linux/init.h>
-#else
-#include <liblustre.h>
+# include <linux/module.h>
+# include <linux/slab.h>
+# include <linux/init.h>
+#else
+# include <liblustre.h>
#endif
#include <linux/lustre_dlm.h>
#include <linux/obd_class.h>
-
extern kmem_cache_t *ldlm_resource_slab;
extern kmem_cache_t *ldlm_lock_slab;
extern struct lustre_lock ldlm_handle_lock;
l_unlock(&lock->l_resource->lr_namespace->ns_lock);
req->rq_level = LUSTRE_CONN_RECOVD;
+ req->rq_timeout = 2;
rc = ptlrpc_queue_wait(req);
if (rc == -ETIMEDOUT || rc == -EINTR) {
ldlm_del_waiting_lock(lock);
req->rq_replen = lustre_msg_size(0, NULL);
req->rq_level = LUSTRE_CONN_RECOVD;
+ req->rq_timeout = 2;
rc = ptlrpc_queue_wait(req);
if (rc == -ETIMEDOUT || rc == -EINTR) {
ldlm_del_waiting_lock(lock);
RETURN(0);
}
-struct ldlm_lock *ldlm_handle2lock_ns(struct ldlm_namespace *ns,
- struct lustre_handle *handle);
-
-static int ldlm_handle_bl_callback(struct ptlrpc_request *req,
- struct ldlm_namespace *ns)
+static void ldlm_handle_bl_callback(struct ptlrpc_request *req,
+ struct ldlm_namespace *ns,
+ struct ldlm_request *dlm_req,
+ struct ldlm_lock *lock)
{
- struct ldlm_request *dlm_req;
- struct ldlm_lock *lock;
int do_ast;
ENTRY;
- OBD_FAIL_RETURN(OBD_FAIL_OSC_LOCK_BL_AST, 0);
-
- dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
-
- lock = ldlm_handle2lock_ns(ns, &dlm_req->lock_handle1);
- if (!lock) {
- CDEBUG(D_INFO, "blocking callback on lock "LPX64
- " - lock disappeared\n", dlm_req->lock_handle1.cookie);
- RETURN(-EINVAL);
- }
-
+ /* Try to narrow down this damn iozone bug */
+ if (lock->l_resource == NULL)
+ CERROR("lock %p resource NULL\n", lock);
+ if (lock->l_resource->lr_type != LDLM_EXTENT)
+ if (lock->l_resource->lr_namespace != ns)
+ CERROR("lock %p namespace %p != passed ns %p\n", lock,
+ lock->l_resource->lr_namespace, ns);
LDLM_DEBUG(lock, "client blocking AST callback handler START");
l_lock(&ns->ns_lock);
LDLM_DEBUG(lock, "client blocking callback handler END");
LDLM_LOCK_PUT(lock);
- RETURN(0);
+ EXIT;
}
-static int ldlm_handle_cp_callback(struct ptlrpc_request *req,
- struct ldlm_namespace *ns)
+static void ldlm_handle_cp_callback(struct ptlrpc_request *req,
+ struct ldlm_namespace *ns,
+ struct ldlm_request *dlm_req,
+ struct ldlm_lock *lock)
{
- struct list_head ast_list = LIST_HEAD_INIT(ast_list);
- struct ldlm_request *dlm_req;
- struct ldlm_lock *lock;
+ LIST_HEAD(ast_list);
ENTRY;
- OBD_FAIL_RETURN(OBD_FAIL_OSC_LOCK_CP_AST, 0);
-
- dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
-
- lock = ldlm_handle2lock_ns(ns, &dlm_req->lock_handle1);
- if (!lock) {
- CERROR("completion callback on lock "LPX64" - lock "
- "disappeared\n", dlm_req->lock_handle1.cookie);
- RETURN(-EINVAL);
- }
-
LDLM_DEBUG(lock, "client completion callback handler START");
l_lock(&ns->ns_lock);
LDLM_DEBUG_NOLOCK("client completion callback handler END (lock %p)",
lock);
- RETURN(0);
+ EXIT;
+}
+
+static int ldlm_callback_reply(struct ptlrpc_request *req, int rc)
+{
+ req->rq_status = rc;
+ rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen,
+ &req->rq_repmsg);
+ if (rc)
+ return rc;
+ return ptlrpc_reply(req->rq_svc, req);
}
static int ldlm_callback_handler(struct ptlrpc_request *req)
{
struct ldlm_namespace *ns;
+ struct ldlm_request *dlm_req;
+ struct ldlm_lock *lock;
int rc;
ENTRY;
dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
CERROR("--> lock addr: "LPX64", cookie: "LPX64"\n",
dlm_req->lock_handle1.addr,dlm_req->lock_handle1.cookie);
- rc = -ENOTCONN;
- goto out;
+ ldlm_callback_reply(req, -ENOTCONN);
+ RETURN(0);
+ }
+
+ if (req->rq_reqmsg->opc == LDLM_BL_CALLBACK) {
+ OBD_FAIL_RETURN(OBD_FAIL_LDLM_BL_CALLBACK, 0);
+ } else if (req->rq_reqmsg->opc == LDLM_CP_CALLBACK) {
+ OBD_FAIL_RETURN(OBD_FAIL_LDLM_CP_CALLBACK, 0);
+ } else {
+ ldlm_callback_reply(req, -EIO);
+ RETURN(0);
}
LASSERT(req->rq_export != NULL);
ns = req->rq_export->exp_obd->obd_namespace;
LASSERT(ns != NULL);
+ dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
+ lock = ldlm_handle2lock_ns(ns, &dlm_req->lock_handle1);
+ if (!lock) {
+ CDEBUG(D_INODE, "callback on lock "LPX64" - lock disappeared\n",
+ dlm_req->lock_handle1.cookie);
+ ldlm_callback_reply(req, -EINVAL);
+ RETURN(0);
+ }
+
+ /* we want the ost thread to get this reply so that it can respond
+ * to ost requests (write cache writeback) that might be triggered
+ * in the callback */
+ ldlm_callback_reply(req, 0);
+
switch (req->rq_reqmsg->opc) {
case LDLM_BL_CALLBACK:
CDEBUG(D_INODE, "blocking ast\n");
- OBD_FAIL_RETURN(OBD_FAIL_LDLM_BL_CALLBACK, 0);
- rc = ldlm_handle_bl_callback(req, ns);
+ ldlm_handle_bl_callback(req, ns, dlm_req, lock);
break;
case LDLM_CP_CALLBACK:
CDEBUG(D_INODE, "completion ast\n");
- OBD_FAIL_RETURN(OBD_FAIL_LDLM_CP_CALLBACK, 0);
- rc = ldlm_handle_cp_callback(req, ns);
+ ldlm_handle_cp_callback(req, ns, dlm_req, lock);
break;
- default:
- CERROR("invalid opcode %d\n", req->rq_reqmsg->opc);
- RETURN(-EINVAL);
}
- out:
- req->rq_status = rc;
- rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
- if (rc)
- RETURN(rc);
- ptlrpc_reply(req->rq_svc, req);
RETURN(0);
}
req->rq_reqmsg->addr, req->rq_reqmsg->cookie);
dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
ldlm_lock_dump_handle(D_ERROR, &dlm_req->lock_handle1);
- CERROR("--> ignoring this error as a temporary workaround! "
- "beware!\n");
- //RETURN(-ENOTCONN);
+ RETURN(-ENOTCONN);
}
switch (req->rq_reqmsg->opc) {
#include <linux/obd_ost.h>
#include <linux/lustre_net.h>
-void ost_pack_ioo(struct obd_ioobj **tmp, struct lov_stripe_md *lsm,int bufcnt)
+void ost_pack_ioo(struct obd_ioobj *ioo, struct lov_stripe_md *lsm, int bufcnt)
{
- struct obd_ioobj *ioo = *tmp;
- void *p = *tmp;
-
ioo->ioo_id = HTON__u64(lsm->lsm_object_id);
ioo->ioo_gr = HTON__u64(0);
ioo->ioo_type = HTON__u32(S_IFREG);
ioo->ioo_bufcnt = HTON__u32(bufcnt);
- *tmp = p + sizeof(*ioo);
}
-void ost_unpack_ioo(struct obd_ioobj **tmp, struct obd_ioobj **ioop)
+void ost_unpack_ioo(struct obd_ioobj *dst, struct obd_ioobj *src)
{
- void *p = *tmp;
- struct obd_ioobj *ioo = *tmp;
- *ioop = *tmp;
-
- ioo->ioo_id = NTOH__u64(ioo->ioo_id);
- ioo->ioo_gr = NTOH__u64(ioo->ioo_gr);
- ioo->ioo_type = NTOH__u32(ioo->ioo_type);
- ioo->ioo_bufcnt = NTOH__u32(ioo->ioo_bufcnt);
- *tmp = p + sizeof(*ioo);
+ dst->ioo_id = NTOH__u64(src->ioo_id);
+ dst->ioo_gr = NTOH__u64(src->ioo_gr);
+ dst->ioo_type = NTOH__u32(src->ioo_type);
+ dst->ioo_bufcnt = NTOH__u32(src->ioo_bufcnt);
}
-void ost_pack_niobuf(void **tmp, __u64 offset, __u32 len, __u32 flags,
- __u32 xid)
+void ost_pack_niobuf(struct niobuf_remote *nb, __u64 offset, __u32 len,
+ __u32 flags, __u32 xid)
{
- struct niobuf_remote *nb = *tmp;
- char *c = *tmp;
-
nb->offset = HTON__u64(offset);
nb->len = HTON__u32(len);
- nb->flags = HTON__u32(flags);
nb->xid = HTON__u32(xid);
- *tmp = c + sizeof(*nb);
+ nb->flags = HTON__u32(flags);
}
-void ost_unpack_niobuf(void **tmp, struct niobuf_remote **nbp)
+void ost_unpack_niobuf(struct niobuf_remote *dst, struct niobuf_remote *src)
{
- char *c = *tmp;
- struct niobuf_remote *nb = *tmp;
-
- *nbp = *tmp;
-
- nb->offset = NTOH__u64(nb->offset);
- nb->len = NTOH__u32(nb->len);
- nb->flags = NTOH__u32(nb->flags);
-
- *tmp = c + sizeof(*nb);
+ dst->offset = NTOH__u64(src->offset);
+ dst->len = NTOH__u32(src->len);
+ dst->xid = NTOH__u32(src->xid);
+ dst->flags = NTOH__u32(src->flags);
}
dlmimp->imp_client = &export->exp_obd->obd_ldlm_client;
dlmimp->imp_handle.addr = req->rq_reqmsg->addr;
dlmimp->imp_handle.cookie = req->rq_reqmsg->cookie;
- dlmimp->imp_obd = /* LDLM! */ NULL;
+ dlmimp->imp_obd = target;
dlmimp->imp_recover = NULL;
INIT_LIST_HEAD(&dlmimp->imp_replay_list);
INIT_LIST_HEAD(&dlmimp->imp_sending_list);
struct ptlrpc_request, rq_list);
if (req->rq_reqmsg->transno != obd->obd_next_recovery_transno) {
+ struct l_wait_info lwi = { 0 };
spin_unlock_bh(&obd->obd_processing_task_lock);
CDEBUG(D_HA, "Waiting for transno "LPD64" (1st is "
LPD64")\n",
obd->obd_next_recovery_transno,
req->rq_reqmsg->transno);
- wait_event(obd->obd_next_transno_waitq,
- check_for_next_transno(obd));
+ l_wait_event(obd->obd_next_transno_waitq,
+ check_for_next_transno(obd), &lwi);
spin_lock_bh(&obd->obd_processing_task_lock);
if (obd->obd_flags & OBD_ABORT_RECOVERY) {
target_abort_recovery(obd);
CFLAGS:=-g -O2 -I$(top_srcdir)/utils -I$(PORTALS)/include -I$(srcdir)/../include -Wall -L$(PORTALSLIB)
KFLAGS:=
-CPPFLAGS = $(HAVE_LIBREADLINE)
-LIBS=
+CPPFLAGS = $(HAVE_EFENCE)
+LIBS = $(LIBEFENCE)
LLIBS= ../lov/liblov.a ../obdecho/libobdecho.a ../osc/libosc.a ../ldlm/libldlm.a ../ptlrpc/libptlrpc.a ../obdclass/liblustreclass.a
libtest_LDADD := $(LIBREADLINE) $(LLIBS) \
$(PORTALS)/user/procbridge/libprocbridge.a $(PORTALS)/user/tcpnal/libtcpnal.a \
- $(PORTALS)/user/util/libtcpnalutil.a $(PORTALS)/user/$(PORTALS)/api/libptlapi.a \
- $(PORTALS)/lib/libptllib.a -lptlctl -lpthread -lefence
+ $(PORTALS)/user/util/libtcpnalutil.a $(PORTALS)/api/libptlapi.a \
+ $(PORTALS)/lib/libptllib.a -lptlctl -lpthread
bin_PROGRAMS = libtest
libtest_SOURCES = libtest.c
#include <liblustre.h>
#include <linux/obd.h>
#include <linux/obd_class.h>
-#include <../user/procbridge/procbridge.h>
+#include <portals/procbridge.h>
ptl_handle_ni_t tcpnal_ni;
modulefs_DATA = llite.o
EXTRA_PROGRAMS = llite
-llite_SOURCES = dcache.c commit_callback.c super.c rw.c super25.c
+llite_SOURCES = dcache.c commit_callback.c super.c rw.c iod.c super25.c
llite_SOURCES += file.c dir.c sysctl.c symlink.c
llite_SOURCES += recover.c namei.c lproc_llite.c
/* And now, loop forever on requests */
while (1) {
- wait_event(sbi->ll_commitcbd_waitq,
- ll_commitcbd_check_event(sbi));
+ struct l_wait_info lwi = { 0 };
+ l_wait_event(sbi->ll_commitcbd_waitq,
+ ll_commitcbd_check_event(sbi), &lwi);
spin_lock(&sbi->ll_commitcbd_lock);
if (sbi->ll_commitcbd_flags & LL_COMMITCBD_STOPPING) {
int ll_commitcbd_setup(struct ll_sb_info *sbi)
{
int rc;
+ struct l_wait_info lwi = { 0 };
ENTRY;
rc = kernel_thread(ll_commitcbd_main, (void *) sbi,
CERROR("cannot start thread\n");
RETURN(rc);
}
- wait_event(sbi->ll_commitcbd_ctl_waitq,
- sbi->ll_commitcbd_flags & LL_COMMITCBD_RUNNING);
+ l_wait_event(sbi->ll_commitcbd_ctl_waitq,
+ sbi->ll_commitcbd_flags & LL_COMMITCBD_RUNNING, &lwi);
RETURN(0);
}
int ll_commitcbd_cleanup(struct ll_sb_info *sbi)
{
+ struct l_wait_info lwi = { 0 };
sbi->ll_commitcbd_flags = LL_COMMITCBD_STOPPING;
wake_up(&sbi->ll_commitcbd_waitq);
- wait_event(sbi->ll_commitcbd_ctl_waitq,
- sbi->ll_commitcbd_flags & LL_COMMITCBD_STOPPED);
+ l_wait_event(sbi->ll_commitcbd_ctl_waitq,
+ sbi->ll_commitcbd_flags & LL_COMMITCBD_STOPPED, &lwi);
RETURN(0);
}
if (!fd) /* no process opened the file after an mcreate */
RETURN(rc = 0);
+ /* we might not be able to get a valid handle on this file
+ * again so we really want to flush our write cache.. */
+ filemap_fdatasync(inode->i_mapping);
+ filemap_fdatawait(inode->i_mapping);
+
if (lsm != NULL) {
memset(&oa, 0, sizeof(oa));
oa.o_id = lsm->lsm_object_id;
RETURN(-ENOMEM);
oa->o_id = lsm->lsm_object_id;
oa->o_mode = S_IFREG;
- oa->o_valid = (OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLSIZE |
- OBD_MD_FLBLOCKS | OBD_MD_FLMTIME | OBD_MD_FLCTIME);
+ oa->o_valid = (OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLBLOCKS |
+ OBD_MD_FLMTIME | OBD_MD_FLCTIME);
rc = obd_open(conn, oa, lsm, NULL);
if (rc)
GOTO(out, rc);
file->f_flags &= ~O_LOV_DELAY_CREATE;
- obdo_to_inode(inode, oa, (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
- OBD_MD_FLMTIME | OBD_MD_FLCTIME));
+ obdo_to_inode(inode, oa, OBD_MD_FLBLOCKS | OBD_MD_FLMTIME |
+ OBD_MD_FLCTIME);
- if (oa->o_valid |= OBD_MD_FLHANDLE)
+ if (oa->o_valid & OBD_MD_FLHANDLE)
memcpy(fd->fd_ostdata, obdo_handle(oa), FD_OSTDATA_SIZE);
EXIT;
return rc;
}
-int ll_size_lock(struct inode *inode, struct lov_stripe_md *lsm, obd_off start,
- int mode, struct lustre_handle *lockh)
+/*
+ * really does the getattr on the inode and updates its fields
+ */
+int ll_inode_getattr(struct inode *inode, struct lov_stripe_md *lsm,
+ char *ostdata)
+{
+ struct ll_sb_info *sbi = ll_i2sbi(inode);
+ struct obdo oa;
+ int rc;
+ ENTRY;
+
+ LASSERT(lsm);
+ LASSERT(sbi);
+
+ memset(&oa, 0, sizeof oa);
+ oa.o_id = lsm->lsm_object_id;
+ oa.o_mode = S_IFREG;
+ oa.o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLSIZE |
+ OBD_MD_FLBLOCKS | OBD_MD_FLMTIME | OBD_MD_FLCTIME;
+
+ if (ostdata != NULL) {
+ memcpy(&oa.o_inline, ostdata, FD_OSTDATA_SIZE);
+ oa.o_valid |= OBD_MD_FLHANDLE;
+ }
+
+ rc = obd_getattr(&sbi->ll_osc_conn, &oa, lsm);
+ if (rc)
+ RETURN(rc);
+
+ obdo_to_inode(inode, &oa, OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
+ OBD_MD_FLMTIME | OBD_MD_FLCTIME);
+
+ CDEBUG(D_INODE, "objid "LPX64" size %Lu/%Lu\n", lsm->lsm_object_id,
+ inode->i_size, inode->i_size);
+ RETURN(0);
+}
+
+/*
+ * we've acquired a lock and need to see if we should perform a getattr
+ * to update the file size that may have been updated by others that had
+ * their locks canceled.
+ */
+static int ll_size_validate(struct inode *inode, struct lov_stripe_md *lsm,
+ char *ostdata, struct ldlm_extent *extent)
+{
+ struct ll_inode_info *lli = ll_i2info(inode);
+ int rc = 0;
+ ENTRY;
+
+ if (test_bit(LLI_F_DID_GETATTR, &lli->lli_flags))
+ RETURN(0);
+
+ down(&lli->lli_getattr_sem);
+
+ if (!test_bit(LLI_F_DID_GETATTR, &lli->lli_flags)) {
+ rc = ll_inode_getattr(inode, lsm, ostdata);
+ if ( rc == 0 )
+ set_bit(LLI_F_DID_GETATTR, &lli->lli_flags);
+ }
+
+ up(&lli->lli_getattr_sem);
+ RETURN(rc);
+}
+
+/*
+ * some callers, notably truncate, really don't want i_size set based
+ * on the the size returned by the getattr, or lock acquisition in
+ * the future.
+ */
+int ll_extent_lock_no_validate(struct ll_file_data *fd, struct inode *inode,
+ struct lov_stripe_md *lsm,
+ int mode, struct ldlm_extent *extent,
+ struct lustre_handle *lockh)
{
struct ll_sb_info *sbi = ll_i2sbi(inode);
- struct ldlm_extent extent;
int rc, flags = 0;
ENTRY;
+ LASSERT(lockh->addr == 0 && lockh->cookie == 0);
+
/* XXX phil: can we do this? won't it screw the file size up? */
- if (sbi->ll_flags & LL_SBI_NOLCK)
+ if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
+ (sbi->ll_flags & LL_SBI_NOLCK))
RETURN(0);
- extent.start = start;
- extent.end = OBD_OBJECT_EOF;
+ CDEBUG(D_INFO, "Locking inode %lu, start "LPU64" end "LPU64"\n",
+ inode->i_ino, extent->start, extent->end);
- rc = obd_enqueue(&sbi->ll_osc_conn, lsm, NULL, LDLM_EXTENT, &extent,
+ rc = obd_enqueue(&sbi->ll_osc_conn, lsm, NULL, LDLM_EXTENT, extent,
sizeof(extent), mode, &flags, ll_lock_callback,
inode, sizeof(*inode), lockh);
+
RETURN(rc);
}
-
-int ll_size_unlock(struct inode *inode, struct lov_stripe_md *lsm, int mode,
+/*
+ * this grabs a lock and manually implements behaviour that makes it look
+ * like the OST is returning the file size with each lock acquisition
+ */
+int ll_extent_lock(struct ll_file_data *fd, struct inode *inode,
+ struct lov_stripe_md *lsm,
+ int mode, struct ldlm_extent *extent,
struct lustre_handle *lockh)
{
- struct ll_sb_info *sbi = ll_i2sbi(inode);
int rc;
ENTRY;
- /* XXX phil: can we do this? won't it screw the file size up? */
- if (sbi->ll_flags & LL_SBI_NOLCK)
- RETURN(0);
+ rc = ll_extent_lock_no_validate(fd, inode, lsm, mode, extent, lockh);
- rc = obd_cancel(&sbi->ll_osc_conn, lsm, mode, lockh);
- if (rc != ELDLM_OK) {
- CERROR("lock cancel: %d\n", rc);
- LBUG();
+ if (rc == ELDLM_OK) {
+ rc = ll_size_validate(inode, lsm, fd ? fd->fd_ostdata : NULL,
+ extent);
+ if ( rc != 0 ) {
+ ll_extent_unlock(fd, inode, lsm, mode, lockh);
+ rc = ELDLM_GETATTR_ERROR;
+ }
}
RETURN(rc);
}
-/* This function is solely "sampling" the file size, and does not explicit
- * locking on the size itself (see ll_size_lock() and ll_size_unlock()).
- *
- * XXX We need to optimize away the obd_getattr for decent performance here,
- * by checking if we already have the size lock and considering our size
- * authoritative in that case. In order to do that either the act of
- * getting the size lock includes retrieving the file size, or the client
- * keeps an atomic flag in the inode which indicates whether the size
- * has been updated (see bug 280).
- */
-int ll_file_size(struct inode *inode, struct lov_stripe_md *lsm, char *ostdata)
+int ll_extent_unlock(struct ll_file_data *fd, struct inode *inode,
+ struct lov_stripe_md *lsm, int mode,
+ struct lustre_handle *lockh)
{
struct ll_sb_info *sbi = ll_i2sbi(inode);
- struct obdo oa;
int rc;
ENTRY;
- LASSERT(lsm);
- LASSERT(sbi);
-
- memset(&oa, 0, sizeof oa);
- oa.o_id = lsm->lsm_object_id;
- oa.o_mode = S_IFREG;
- oa.o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLSIZE |
- OBD_MD_FLBLOCKS | OBD_MD_FLMTIME | OBD_MD_FLCTIME;
-
- if (ostdata != NULL) {
- memcpy(&oa.o_inline, ostdata, FD_OSTDATA_SIZE);
- oa.o_valid |= OBD_MD_FLHANDLE;
- }
+ /* XXX phil: can we do this? won't it screw the file size up? */
+ if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
+ (sbi->ll_flags & LL_SBI_NOLCK))
+ RETURN(0);
- rc = obd_getattr(&sbi->ll_osc_conn, &oa, lsm);
- if (!rc) {
- obdo_to_inode(inode, &oa, OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
- OBD_MD_FLMTIME | OBD_MD_FLCTIME);
- CDEBUG(D_INODE, "objid "LPX64" size %Lu/%Lx\n",
- lsm->lsm_object_id, inode->i_size, inode->i_size);
- }
+ rc = obd_cancel(&sbi->ll_osc_conn, lsm, mode, lockh);
RETURN(rc);
}
void *data, int flag)
{
struct inode *inode = data;
+ struct ll_inode_info *lli = ll_i2info(inode);
struct lustre_handle lockh = { 0, 0 };
int rc;
ENTRY;
CERROR("ldlm_cli_cancel failed: %d\n", rc);
break;
case LDLM_CB_CANCELING:
+ /* FIXME: we could be given 'canceling intents' so that we
+ * could know to write-back or simply throw away the pages
+ * based on if the cancel comes from a desire to, say,
+ * read or truncate.. */
CDEBUG(D_INODE, "invalidating obdo/inode %lu\n", inode->i_ino);
- /* FIXME: do something better than throwing away everything */
- //down(&inode->i_sem);
- ll_invalidate_inode_pages(inode);
- //up(&inode->i_sem);
+ filemap_fdatasync(inode->i_mapping);
+ filemap_fdatawait(inode->i_mapping);
+ clear_bit(LLI_F_DID_GETATTR, &lli->lli_flags);
+ truncate_inode_pages(inode->i_mapping, 0);
break;
default:
LBUG();
{
struct ll_file_data *fd = filp->private_data;
struct inode *inode = filp->f_dentry->d_inode;
- struct ll_sb_info *sbi = ll_i2sbi(inode);
+ struct ll_inode_info *lli = ll_i2info(inode);
+ struct lov_stripe_md *lsm = lli->lli_smd;
struct lustre_handle lockh = { 0, 0 };
- struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
- int flags = 0;
+ struct ll_read_extent rextent;
ldlm_error_t err;
ssize_t retval;
ENTRY;
-
CDEBUG(D_VFSTRACE, "VFS Op\n");
- if (!(fd->fd_flags & LL_FILE_IGNORE_LOCK) &&
- !(sbi->ll_flags & LL_SBI_NOLCK)) {
- struct ldlm_extent extent;
- extent.start = *ppos;
- extent.end = *ppos + count - 1;
- CDEBUG(D_INFO, "Locking inode %lu, start "LPU64" end "LPU64"\n",
- inode->i_ino, extent.start, extent.end);
-
- err = obd_enqueue(&sbi->ll_osc_conn, lsm, NULL, LDLM_EXTENT,
- &extent, sizeof(extent), LCK_PR, &flags,
- ll_lock_callback, inode, sizeof(*inode),
- &lockh);
- if (err != ELDLM_OK) {
- CERROR("lock enqueue: err: %d\n", err);
- RETURN(err);
- }
- }
- /* If we don't refresh the file size, generic_file_read may not even
- * call ll_readpage */
- retval = ll_file_size(inode, lsm, fd->fd_ostdata);
- if (retval < 0) {
- CERROR("ll_file_size: "LPSZ"\n", retval);
+ /* "If nbyte is 0, read() will return 0 and have no other results."
+ * -- Single Unix Spec */
+ if (count == 0)
+ RETURN(0);
+
+ rextent.re_extent.start = *ppos;
+ rextent.re_extent.end = *ppos + count - 1;
+
+ err = ll_extent_lock(fd, inode, lsm,
+ LCK_PR, &rextent.re_extent, &lockh);
+ if (err != ELDLM_OK && err != ELDLM_LOCK_MATCHED) {
+ retval = -ENOLCK;
RETURN(retval);
}
+ /* XXX tell ll_readpage what pages have a PR lock.. */
+ rextent.re_task = current;
+ spin_lock(&lli->lli_read_extent_lock);
+ list_add(&rextent.re_lli_item, &lli->lli_read_extents);
+ spin_unlock(&lli->lli_read_extent_lock);
+
CDEBUG(D_INFO, "Reading inode %lu, "LPSZ" bytes, offset %Ld\n",
inode->i_ino, count, *ppos);
retval = generic_file_read(filp, buf, count, ppos);
+ spin_lock(&lli->lli_read_extent_lock);
+ list_del(&rextent.re_lli_item);
+ spin_unlock(&lli->lli_read_extent_lock);
+
if (retval > 0)
ll_update_atime(inode);
- if (!(fd->fd_flags & LL_FILE_IGNORE_LOCK) &&
- !(sbi->ll_flags & LL_SBI_NOLCK)) {
- err = obd_cancel(&sbi->ll_osc_conn, lsm, LCK_PR, &lockh);
- if (err != ELDLM_OK) {
- CERROR("lock cancel: err: %d\n", err);
- retval = err;
- }
- }
-
+ /* XXX errors? */
+ ll_extent_unlock(fd, inode, lsm, LCK_PR, &lockh);
RETURN(retval);
}
{
struct ll_file_data *fd = file->private_data;
struct inode *inode = file->f_dentry->d_inode;
- struct ll_sb_info *sbi = ll_i2sbi(inode);
- struct lustre_handle lockh = { 0, 0 }, eof_lockh = { 0, 0 };
+ struct lustre_handle lockh = { 0, 0 };
struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
- int flags = 0;
+ struct ldlm_extent extent;
ldlm_error_t err;
ssize_t retval;
ENTRY;
/* POSIX, but surprised the VFS doesn't check this already */
if (count == 0)
- return 0;
+ RETURN(0);
CDEBUG(D_VFSTRACE, "VFS Op\n");
if (!S_ISBLK(inode->i_mode) && file->f_flags & O_APPEND) {
- err = ll_size_lock(inode, lsm, 0, LCK_PW, &eof_lockh);
- if (err)
- RETURN(err);
-
- /* Get size here so we know extent to enqueue write lock on. */
- retval = ll_file_size(inode, lsm, fd->fd_ostdata);
- if (retval)
- GOTO(out_eof, retval);
-
- *ppos = inode->i_size;
- }
-
- if (!(fd->fd_flags & LL_FILE_IGNORE_LOCK) &&
- !(sbi->ll_flags & LL_SBI_NOLCK)) {
- struct ldlm_extent extent;
+ extent.start = 0;
+ extent.end = OBD_OBJECT_EOF;
+ } else {
extent.start = *ppos;
extent.end = *ppos + count - 1;
- CDEBUG(D_INFO, "Locking inode %lu, start "LPU64" end "LPU64"\n",
- inode->i_ino, extent.start, extent.end);
-
- err = obd_enqueue(&sbi->ll_osc_conn, lsm, NULL, LDLM_EXTENT,
- &extent, sizeof(extent), LCK_PW, &flags,
- ll_lock_callback, inode, sizeof(*inode),
- &lockh);
- if (err != ELDLM_OK) {
- CERROR("lock enqueue: err: %d\n", err);
- GOTO(out_eof, retval = err);
- }
}
+ err = ll_extent_lock(fd, inode, lsm, LCK_PW, &extent, &lockh);
+ if (err != ELDLM_OK && err != ELDLM_LOCK_MATCHED) {
+ retval = -ENOLCK;
+ RETURN(retval);
+ }
+
+ if (!S_ISBLK(inode->i_mode) && file->f_flags & O_APPEND)
+ *ppos = inode->i_size;
+
CDEBUG(D_INFO, "Writing inode %lu, "LPSZ" bytes, offset %Lu\n",
inode->i_ino, count, *ppos);
retval = generic_file_write(file, buf, count, ppos);
- if (!(fd->fd_flags & LL_FILE_IGNORE_LOCK) &&
- !(sbi->ll_flags & LL_SBI_NOLCK)) {
- err = obd_cancel(&sbi->ll_osc_conn, lsm, LCK_PW, &lockh);
- if (err != ELDLM_OK)
- CERROR("lock cancel: err: %d\n", err);
- }
-
- EXIT;
- out_eof:
- if (!S_ISBLK(inode->i_mode) && file->f_flags & O_APPEND) {
- err = ll_size_unlock(inode, lsm, LCK_PW, &eof_lockh);
- if (err)
- CERROR("ll_size_unlock: %d\n", err);
- }
-
- return retval;
+ /* XXX errors? */
+ ll_extent_unlock(fd, inode, lsm, LCK_PW, &lockh);
+ RETURN(retval);
}
static int ll_lov_setstripe(struct inode *inode, struct file *file,
loff_t ll_file_seek(struct file *file, loff_t offset, int origin)
{
struct inode *inode = file->f_dentry->d_inode;
- long long retval;
+ struct ll_file_data *fd = file->private_data;
+ struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
+ struct lustre_handle lockh = {0, 0};
+ loff_t retval;
ENTRY;
CDEBUG(D_VFSTRACE, "VFS Op\n");
- switch (origin) {
- case 2: {
- struct ll_inode_info *lli = ll_i2info(inode);
- struct ll_file_data *fd = file->private_data;
-
- retval = ll_file_size(inode, lli->lli_smd, fd->fd_ostdata);
- if (retval)
+ if (origin == 2) { /* SEEK_END */
+ ldlm_error_t err;
+ struct ldlm_extent extent = {0, OBD_OBJECT_EOF};
+ err = ll_extent_lock(fd, inode, lsm, LCK_PR, &extent, &lockh);
+ if (err != ELDLM_OK && err != ELDLM_LOCK_MATCHED) {
+ retval = -ENOLCK;
RETURN(retval);
+ }
offset += inode->i_size;
- break;
- }
- case 1:
+ } else if (origin == 1) { /* SEEK_CUR */
offset += file->f_pos;
}
+
retval = -EINVAL;
if (offset >= 0 && offset <= inode->i_sb->s_maxbytes) {
if (offset != file->f_pos) {
}
retval = offset;
}
+
+ if (origin == 2)
+ ll_extent_unlock(fd, inode, lsm, LCK_PR, &lockh);
RETURN(retval);
}
-/* XXX this does not need to do anything for data, it _does_ need to
- call setattr */
int ll_fsync(struct file *file, struct dentry *dentry, int data)
{
- return 0;
+ int ret;
+ ENTRY;
+
+ /*
+ * filemap_fdata{sync,wait} are also called at PW lock cancelation so
+ * we know that they can only find data to writeback here if we are
+ * still holding the PW lock that covered the dirty pages. XXX we
+ * should probably get a reference on it, though, just to be clear.
+ */
+ ret = filemap_fdatasync(dentry->d_inode->i_mapping);
+ if ( ret == 0 )
+ ret = filemap_fdatawait(dentry->d_inode->i_mapping);
+
+ RETURN(ret);
}
int ll_inode_revalidate(struct dentry *dentry)
if (!lsm) /* object not yet allocated, don't validate size */
RETURN(0);
- /* XXX this should probably become an unconditional obd_getattr()
- * so that we update the blocks count and mtime from the OST too.
+ /*
+ * unfortunately stat comes in through revalidate and we don't
+ * differentiate this use from initial instantiation. we're
+ * also being wildly conservative and flushing write caches
+ * so that stat really returns the proper size.
*/
- RETURN(ll_file_size(inode, lsm, NULL));
+ {
+ struct ldlm_extent extent = {0, OBD_OBJECT_EOF};
+ struct lustre_handle lockh = {0, 0};
+ ldlm_error_t err;
+
+ err = ll_extent_lock(NULL, inode, lsm, LCK_PR, &extent, &lockh);
+ if (err != ELDLM_OK && err != ELDLM_LOCK_MATCHED )
+ RETURN(-abs(err)); /* XXX can't be right */
+
+ ll_extent_unlock(NULL, inode, lsm, LCK_PR, &lockh);
+ }
+ RETURN(0);
}
#if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
release: ll_file_release,
mmap: generic_file_mmap,
llseek: ll_file_seek,
- fsync: NULL
+ fsync: ll_fsync,
};
struct inode_operations ll_file_inode_operations = {
--- /dev/null
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ * Copyright (c) 2002, 2003 Cluster File Systems, Inc.
+ *
+ * This file is part of Lustre, http://www.lustre.org.
+ *
+ * Lustre is free software; you can redistribute it and/or
+ * modify it under the terms of version 2 of the GNU General Public
+ * License as published by the Free Software Foundation.
+ *
+ * Lustre is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Lustre; if not, write to the Free Software
+ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ *
+ * Copyright (C) 2002, 2003 Cluster File Systems, Inc
+ *
+ * this started as an implementation of an io daemon that woke regularly
+ * to force writeback.. the throttling in prepare_write and kupdate's usual
+ * writeback pressure got rid of our thread, but the file name remains.
+ */
+#include <linux/version.h>
+#include <linux/config.h>
+#include <linux/module.h>
+#include <linux/fs.h>
+#include <linux/stat.h>
+#include <linux/sched.h>
+#include <linux/smp_lock.h>
+#include <linux/kmod.h>
+#include <linux/pagemap.h>
+#include <linux/mm.h>
+
+/* PG_inactive_clean is shorthand for rmap, we want free_high/low here.. */
+#ifdef PG_inactive_clean
+#include <linux/mm_inline.h>
+#endif
+
+#define DEBUG_SUBSYSTEM S_LLITE
+#include <linux/lustre_lite.h>
+
+#ifndef list_for_each_prev_safe
+#define list_for_each_prev_safe(pos, n, head) \
+ for (pos = (head)->prev, n = pos->prev; pos != (head); \
+ pos = n, n = pos->prev )
+#endif
+
+extern spinlock_t inode_lock;
+
+#define LLWP_MAX_PAGES (PTL_MD_MAX_IOV)
+struct ll_writeback_pages {
+ unsigned has_whole_pages:1,
+ num_frags:2,
+ num_pages:29;
+ struct brw_page pgs[LLWP_MAX_PAGES];
+};
+
+
+/*
+ * ugh, we want disk allocation on the target to happen in offset order. we'll
+ * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
+ * fine for our small page arrays and doesn't require allocation. its an
+ * insertion sort that swaps elements that are strides apart, shrinking the
+ * stride down until its '1' and the array is sorted.
+ */
+void sort_brw_pages(struct brw_page *array, int num)
+{
+ int stride, i, j;
+ struct brw_page tmp;
+
+ if ( num == 1 )
+ return;
+
+ for( stride = 1; stride < num ; stride = (stride*3) +1 )
+ ;
+
+ do {
+ stride /= 3;
+ for ( i = stride ; i < num ; i++ ) {
+ tmp = array[i];
+ j = i;
+ while ( j >= stride &&
+ array[j - stride].off > tmp.off ) {
+ array[j] = array[j - stride];
+ j -= stride;
+ }
+ array[j] = tmp;
+ }
+ } while ( stride > 1 );
+}
+
+/*
+ * returns 0 if the page was inserted in the array because it was
+ * within i_size. if we raced with truncate and i_size was less
+ * than the page we can unlock the page because truncate_inode_pages will
+ * be waiting to cleanup the page
+ */
+static int llwp_consume_page(struct ll_writeback_pages *llwp,
+ struct inode *inode, struct page *page)
+{
+ obd_off off = ((obd_off)page->index) << PAGE_SHIFT;
+ struct brw_page *pg;
+
+ /* we raced with truncate? */
+ if ( off >= inode->i_size ) {
+ unlock_page(page);
+ goto out;
+ }
+
+ page_cache_get(page);
+ pg = &llwp->pgs[llwp->num_pages];
+ llwp->num_pages++;
+
+ pg->pg = page;
+ pg->off = off;
+ pg->flag = OBD_BRW_CREATE;
+ pg->count = PAGE_SIZE;
+
+ /* catch partial writes for files that end mid-page */
+ if ( pg->off + pg->count > inode->i_size )
+ pg->count = inode->i_size & ~PAGE_MASK;
+
+ if ( pg->count == PAGE_SIZE ) {
+ if ( ! llwp->has_whole_pages ) {
+ llwp->has_whole_pages = 1;
+ llwp->num_frags++;
+ }
+ } else {
+ llwp->num_frags++;
+ }
+
+ /*
+ * matches ptlrpc_bulk_get assert that trickles down
+ * from a 0 page length going through niobuf and into
+ * the buffer regions being posted
+ */
+ LASSERT(pg->count >= 0);
+
+ CDEBUG(D_CACHE, "brw_page %p: off "LPU64" cnt %d, page %p: ind %ld"
+ " i_size: "LPU64"\n", pg, pg->off, pg->count, page,
+ page->index, inode->i_size);
+
+ if ( llwp->num_frags == 3 || llwp->num_pages == LLWP_MAX_PAGES )
+ return -1;
+
+out:
+ return 0;
+}
+
+/*
+ * returns the number of pages that it added to the pgs array
+ *
+ * this duplicates filemap_fdatasync and gives us an opportunity to grab lots
+ * of dirty pages..
+ */
+static void ll_get_dirty_pages(struct inode *inode,
+ struct ll_writeback_pages *llwp)
+{
+ struct address_space *mapping = inode->i_mapping;
+ struct page *page;
+ struct list_head *pos, *n;
+ ENTRY;
+
+ spin_lock(&pagecache_lock);
+
+ list_for_each_prev_safe(pos, n, &mapping->dirty_pages) {
+ page = list_entry(pos, struct page, list);
+
+ if (TryLockPage(page))
+ continue;
+
+ list_del(&page->list);
+ list_add(&page->list, &mapping->locked_pages);
+
+ if ( ! PageDirty(page) ) {
+ unlock_page(page);
+ continue;
+ }
+ ClearPageDirty(page);
+
+ if ( llwp_consume_page(llwp, inode, page) != 0)
+ break;
+ }
+
+ spin_unlock(&pagecache_lock);
+ EXIT;
+}
+
+static void ll_brw_pages_unlock( struct inode *inode,
+ struct ll_writeback_pages *llwp)
+{
+ int rc, i;
+ struct obd_brw_set *set;
+ ENTRY;
+
+ sort_brw_pages(llwp->pgs, llwp->num_pages);
+
+ set = obd_brw_set_new();
+ if (set == NULL) {
+ EXIT;
+ return;
+ }
+ set->brw_callback = ll_brw_sync_wait;
+
+ rc = obd_brw(OBD_BRW_WRITE, ll_i2obdconn(inode),
+ ll_i2info(inode)->lli_smd, llwp->num_pages, llwp->pgs,
+ set, NULL);
+ if (rc) {
+ CERROR("error from obd_brw: rc = %d\n", rc);
+ } else {
+ rc = ll_brw_sync_wait(set, CB_PHASE_START);
+ if (rc)
+ CERROR("error from callback: rc = %d\n", rc);
+ }
+ obd_brw_set_decref(set);
+
+ /* XXX this doesn't make sense to me */
+ rc = 0;
+
+ for ( i = 0 ; i < llwp->num_pages ; i++) {
+ struct page *page = llwp->pgs[i].pg;
+
+ CDEBUG(D_CACHE, "cleaning page %p\n", page);
+ LASSERT(PageLocked(page));
+ unlock_page(page);
+ page_cache_release(page);
+ }
+
+ EXIT;
+}
+
+#ifndef PG_inactive_clean
+#ifdef CONFIG_DISCONTIGMEM
+#error "sorry, we don't support DISCONTIGMEM yet"
+#endif
+/*
+ * __alloc_pages marks a zone as needing balancing if an allocation is
+ * performed when the zone has fewer free pages than its 'low' water
+ * mark. its cleared when try_to_free_pages makes progress.
+ */
+static int zones_need_balancing(void)
+{
+ pg_data_t * pgdat;
+ zone_t *zone;
+ int i;
+
+ for ( pgdat = pgdat_list ; pgdat != NULL ; pgdat = pgdat->node_next ) {
+ for ( i = pgdat->nr_zones-1 ; i >= 0 ; i-- ) {
+ zone = &pgdat->node_zones[i];
+
+ if ( zone->need_balance )
+ return 1;
+ }
+ }
+ return 0;
+}
+#endif
+/* 2.4 doesn't give us a way to find out how many pages we have
+ * cached 'cause we're not using buffer_heads. we are very
+ * conservative here and flush the superblock of all dirty data
+ * when the vm (rmap or stock) thinks that it is running low
+ * and kswapd would have done work. kupdated isn't good enough
+ * because writers (dbench) can dirty _very quickly_, and we
+ * allocate under writepage..
+ *
+ * 2.5 gets this right, see the {inc,dec}_page_state(nr_dirty, )
+ */
+static int should_writeback(void)
+{
+#ifdef PG_inactive_clean
+ if (free_high(ALL_ZONES) > 0 || free_low(ANY_ZONE) > 0)
+#else
+ if (zones_need_balancing())
+#endif
+ return 1;
+ return 0;
+}
+
+int ll_check_dirty( struct super_block *sb)
+{
+ unsigned long old_flags; /* hack? */
+ int making_progress;
+ struct ll_writeback_pages *llwp;
+ struct inode *inode;
+ int rc = 0;
+ ENTRY;
+
+ if ( ! should_writeback() )
+ return 0;
+
+ old_flags = current->flags;
+ current->flags |= PF_MEMALLOC;
+ llwp = kmalloc(sizeof(struct ll_writeback_pages), GFP_ATOMIC);
+ if ( llwp == NULL )
+ GOTO(cleanup, rc = -ENOMEM);
+ memset(llwp, 0, offsetof(struct ll_writeback_pages, pgs));
+
+ spin_lock(&inode_lock);
+
+ /*
+ * first we try and write back dirty pages from dirty inodes
+ * until the VM thinkgs we're ok again..
+ */
+ do {
+ struct list_head *pos;
+ inode = NULL;
+ making_progress = 0;
+
+ list_for_each_prev(pos, &sb->s_dirty) {
+ inode = list_entry(pos, struct inode, i_list);
+
+ if ( ! (inode->i_state & I_DIRTY_PAGES) ) {
+ inode = NULL;
+ continue;
+ }
+ break;
+ }
+
+ if ( inode == NULL )
+ break;
+
+ /* duplicate __sync_one, *sigh* */
+ list_del(&inode->i_list);
+ list_add(&inode->i_list, &inode->i_sb->s_locked_inodes);
+ inode->i_state |= I_LOCK;
+ inode->i_state &= ~I_DIRTY_PAGES;
+
+ spin_unlock(&inode_lock);
+
+ do {
+ memset(llwp, 0, sizeof(*llwp));
+ ll_get_dirty_pages(inode, llwp);
+ if ( llwp->num_pages ) {
+ ll_brw_pages_unlock(inode, llwp);
+ rc += llwp->num_pages;
+ making_progress = 1;
+ }
+ } while (llwp->num_pages && should_writeback() );
+
+ spin_lock(&inode_lock);
+
+ if ( ! list_empty(&inode->i_mapping->dirty_pages) )
+ inode->i_state |= I_DIRTY_PAGES;
+
+ inode->i_state &= ~I_LOCK;
+ /*
+ * we are sneaky and leave the inode on the dirty list,
+ * even though it might not still be..
+ */
+ if (!(inode->i_state & I_FREEING)) {
+ list_del(&inode->i_list);
+ list_add(&inode->i_list, &inode->i_sb->s_dirty);
+ }
+ wake_up(&inode->i_wait);
+
+ } while ( making_progress && should_writeback() );
+
+ /*
+ * and if that didn't work, we sleep on any data that might
+ * be under writeback..
+ */
+ while ( should_writeback() ) {
+ if ( list_empty(&sb->s_locked_inodes) )
+ break;
+
+ inode = list_entry(sb->s_locked_inodes.next, struct inode,
+ i_list);
+
+ atomic_inc(&inode->i_count); /* XXX hack? */
+ spin_unlock(&inode_lock);
+ wait_event(inode->i_wait, !(inode->i_state & I_LOCK));
+ iput(inode);
+ spin_lock(&inode_lock);
+ }
+
+ spin_unlock(&inode_lock);
+
+cleanup:
+ if ( llwp != NULL )
+ kfree(llwp);
+ current->flags = old_flags;
+
+ RETURN(rc);
+}
+
+int ll_batch_writepage( struct inode *inode, struct page *page )
+{
+ unsigned long old_flags; /* hack? */
+ struct ll_writeback_pages *llwp;
+ int rc = 0;
+ ENTRY;
+
+ old_flags = current->flags;
+ current->flags |= PF_MEMALLOC;
+ llwp = kmalloc(sizeof(struct ll_writeback_pages), GFP_ATOMIC);
+ if ( llwp == NULL )
+ GOTO(cleanup, rc = -ENOMEM);
+ memset(llwp, 0, offsetof(struct ll_writeback_pages, pgs));
+
+ llwp_consume_page(llwp, inode, page);
+
+ ll_get_dirty_pages(inode, llwp);
+ if ( llwp->num_pages )
+ ll_brw_pages_unlock(inode, llwp);
+
+cleanup:
+ if ( llwp != NULL )
+ kfree(llwp);
+ current->flags = old_flags;
+ RETURN(rc);
+}
#include <asm/system.h>
#include <asm/uaccess.h>
+
#include <linux/fs.h>
#if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
#include <linux/buffer_head.h>
list_del(&page->list);
list_add(&page->list, &mapping->clean_pages);
+ /* XXX doesn't inode_lock protect i_state ? */
inode = mapping->host;
if (list_empty(&mapping->dirty_pages)) {
CDEBUG(D_INODE, "inode clean\n");
EXIT;
}
-inline void set_page_clean(struct page *page)
+void set_page_clean(struct page *page)
{
if (PageDirty(page)) {
ClearPageDirty(page);
}
/* SYNCHRONOUS I/O to object storage for an inode */
-static int ll_brw(int cmd, struct inode *inode, struct page *page, int create)
+static int ll_brw(int cmd, struct inode *inode, struct page *page, int flags)
{
struct ll_inode_info *lli = ll_i2info(inode);
struct lov_stripe_md *lsm = lli->lli_smd;
pg.count = PAGE_SIZE;
CDEBUG(D_PAGE, "%s %d bytes ino %lu at "LPU64"/"LPX64"\n",
- cmd & OBD_BRW_WRITE ? "write" : "read", pg.count, inode->i_ino,
- pg.off, pg.off);
+ cmd & OBD_BRW_WRITE ? "write" : "read", pg.count, inode->i_ino,
+ pg.off, pg.off);
if (pg.count == 0) {
CERROR("ZERO COUNT: ino %lu: size %p:%Lu(%p:%Lu) idx %lu off "
LPU64"\n",
page->mapping->host->i_size, page->index, pg.off);
}
- pg.flag = create ? OBD_BRW_CREATE : 0;
+ pg.flag = flags;
set->brw_callback = ll_brw_sync_wait;
rc = obd_brw(cmd, ll_i2obdconn(inode), lsm, 1, &pg, set, NULL);
if (rc)
CERROR("error from callback: rc = %d\n", rc);
}
- obd_brw_set_free(set);
+ obd_brw_set_decref(set);
RETURN(rc);
}
-/* returns the page unlocked, but with a reference */
-static int ll_readpage(struct file *file, struct page *page)
+/*
+ * we were asked to read a single page but we're going to try and read a batch
+ * of pages all at once. this vaguely simulates 2.5's readpages.
+ */
+static int ll_readpage(struct file *file, struct page *first_page)
{
- struct inode *inode = page->mapping->host;
- obd_off offset = ((obd_off)page->index) << PAGE_SHIFT;
- int rc = 0;
+ struct inode *inode = first_page->mapping->host;
+ struct ll_inode_info *lli = ll_i2info(inode);
+ struct page *page = first_page;
+ struct list_head *pos;
+ struct brw_page *pgs;
+ struct obd_brw_set *set;
+ unsigned long end_index, extent_end = 0;
+ int npgs = 0, rc = 0;
ENTRY;
- if (!PageLocked(page))
- LBUG();
+ LASSERT(PageLocked(page));
+ LASSERT(!PageUptodate(page));
+ CDEBUG(D_VFSTRACE, "VFS Op\n");
- if (inode->i_size <= offset) {
+ if (inode->i_size <= ((obd_off)page->index) << PAGE_SHIFT) {
CERROR("reading beyond EOF\n");
memset(kmap(page), 0, PAGE_SIZE);
kunmap(page);
- GOTO(readpage_out, rc);
+ SetPageUptodate(page);
+ unlock_page(page);
+ RETURN(rc);
}
- /* XXX Workaround for BA OSTs returning short reads at EOF. The linux
- * OST will return the full page, zero-filled at the end, which
- * will just overwrite the data we set here.
- * Bug 593 relates to fixing this properly.
+ pgs = kmalloc(PTL_MD_MAX_IOV * sizeof(*pgs), GFP_USER);
+ if ( pgs == NULL )
+ RETURN(-ENOMEM);
+ set = obd_brw_set_new();
+ if ( set == NULL )
+ GOTO(out_pgs, rc = -ENOMEM);
+
+ /* arbitrarily try to read-ahead 8 times what we can pass on
+ * the wire at once, clamped to file size */
+ end_index = first_page->index +
+ 8 * ((PTL_MD_MAX_IOV * PAGE_SIZE)>>PAGE_CACHE_SHIFT);
+ if ( end_index > inode->i_size >> PAGE_CACHE_SHIFT )
+ end_index = inode->i_size >> PAGE_CACHE_SHIFT;
+
+ /*
+ * find how far we're allowed to read under the extent ll_file_read
+ * is passing us..
*/
- if (inode->i_size < offset + PAGE_SIZE) {
- int count = inode->i_size - offset;
- void *addr = kmap(page);
- //POISON(addr, 0x7c, count);
- memset(addr + count, 0, PAGE_SIZE - count);
- kunmap(page);
+ spin_lock(&lli->lli_read_extent_lock);
+ list_for_each(pos, &lli->lli_read_extents) {
+ struct ll_read_extent *rextent;
+ rextent = list_entry(pos, struct ll_read_extent, re_lli_item);
+ if ( rextent->re_task != current )
+ continue;
+
+ if (rextent->re_extent.end + PAGE_SIZE < rextent->re_extent.end)
+ /* extent wrapping */
+ extent_end = ~0;
+ else {
+ extent_end = ( rextent->re_extent.end + PAGE_SIZE )
+ << PAGE_CACHE_SHIFT;
+ /* 32bit indexes, 64bit extents.. */
+ if ( ((u64)extent_end >> PAGE_CACHE_SHIFT ) <
+ rextent->re_extent.end )
+ extent_end = ~0;
+ }
+ break;
}
+ spin_unlock(&lli->lli_read_extent_lock);
+
+ if ( extent_end == 0 ) {
+ CERROR("readpage outside ll_file_read, no lock held?\n");
+ end_index = page->index + 1;
+ } else if ( extent_end < end_index )
+ end_index = extent_end;
+
+ /* to balance the find_get_page ref the other pages get that is
+ * decrefed on teardown.. */
+ page_cache_get(page);
+ do {
+ unsigned long index ;
+
+ pgs[npgs].pg = page;
+ pgs[npgs].off = ((obd_off)page->index) << PAGE_CACHE_SHIFT;
+ pgs[npgs].flag = 0;
+ pgs[npgs].count = PAGE_SIZE;
+ /* XXX Workaround for BA OSTs returning short reads at EOF.
+ * The linux OST will return the full page, zero-filled at the
+ * end, which will just overwrite the data we set here. Bug
+ * 593 relates to fixing this properly.
+ */
+ if (inode->i_size < pgs[npgs].off + PAGE_SIZE) {
+ int count = inode->i_size - pgs[npgs].off;
+ void *addr = kmap(page);
+ pgs[npgs].count = count;
+ //POISON(addr, 0x7c, count);
+ memset(addr + count, 0, PAGE_SIZE - count);
+ kunmap(page);
+ }
+
+ npgs++;
+ if ( npgs == PTL_MD_MAX_IOV )
+ break;
+
+ /*
+ * find pages ahead of us that we can read in.
+ * grab_cache_page waits on pages that are locked so
+ * we first try find_get_page, which doesn't. this stops
+ * the worst case behaviour of racing threads waiting on
+ * each other, but doesn't remove it entirely.
+ */
+ for ( index = page->index + 1, page = NULL ;
+ page == NULL && index < end_index ; index++ ) {
+
+ /* see if the page already exists and needs updating */
+ page = find_get_page(inode->i_mapping, index);
+ if ( page ) {
+ if ( Page_Uptodate(page) || TryLockPage(page) )
+ goto out_release;
+ if ( !page->mapping || Page_Uptodate(page))
+ goto out_unlock;
+ } else {
+ /* ok, we have to create it.. */
+ page = grab_cache_page(inode->i_mapping, index);
+ if ( page == NULL )
+ continue;
+ if ( Page_Uptodate(page) )
+ goto out_unlock;
+ }
+
+ break;
+
+ out_unlock:
+ unlock_page(page);
+ out_release:
+ page_cache_release(page);
+ page = NULL;
+ }
- if (PageUptodate(page)) {
- CERROR("Explain this please?\n");
- GOTO(readpage_out, rc);
+ } while (page);
+
+ set->brw_callback = ll_brw_sync_wait;
+ rc = obd_brw(OBD_BRW_READ, ll_i2obdconn(inode),
+ ll_i2info(inode)->lli_smd, npgs, pgs, set, NULL);
+ if (rc) {
+ CERROR("error from obd_brw: rc = %d\n", rc);
+ } else {
+ rc = ll_brw_sync_wait(set, CB_PHASE_START);
+ if (rc)
+ CERROR("error from callback: rc = %d\n", rc);
}
+ obd_brw_set_decref(set);
- CDEBUG(D_VFSTRACE, "VFS Op\n");
- rc = ll_brw(OBD_BRW_READ, inode, page, 0);
- EXIT;
+ while ( --npgs > -1 ) {
+ page = pgs[npgs].pg;
- readpage_out:
- if (!rc)
- SetPageUptodate(page);
- unlock_page(page);
- return 0;
+ if ( rc == 0 )
+ SetPageUptodate(page);
+ unlock_page(page);
+ page_cache_release(page);
+ }
+out_pgs:
+ kfree(pgs);
+ RETURN(rc);
} /* ll_readpage */
void ll_truncate(struct inode *inode)
struct obdo oa = {0};
struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
struct lustre_handle lockh = { 0, 0 };
+ struct ldlm_extent extent = {inode->i_size, OBD_OBJECT_EOF};
int err;
ENTRY;
if (!lsm) {
/* object not yet allocated */
inode->i_mtime = inode->i_ctime = CURRENT_TIME;
+ EXIT;
return;
}
CDEBUG(D_INFO, "calling punch for "LPX64" (all bytes after %Lu)\n",
oa.o_id, inode->i_size);
- err = ll_size_lock(inode, lsm, inode->i_size, LCK_PW, &lockh);
- if (err) {
- CERROR("ll_size_lock failed: %d\n", err);
+ /* i_size has already been set to the new size */
+ err = ll_extent_lock_no_validate(NULL, inode, lsm, LCK_PW,
+ &extent, &lockh);
+ if (err != ELDLM_OK && err != ELDLM_LOCK_MATCHED) {
+ EXIT;
return;
}
else
obdo_to_inode(inode, &oa, oa.o_valid);
- err = ll_size_unlock(inode, lsm, LCK_PW, &lockh);
+ err = ll_extent_unlock(NULL, inode, lsm, LCK_PW, &lockh);
if (err)
- CERROR("ll_size_unlock failed: %d\n", err);
+ CERROR("ll_extent_unlock failed: %d\n", err);
EXIT;
return;
struct inode *inode = page->mapping->host;
obd_off offset = ((obd_off)page->index) << PAGE_SHIFT;
int rc = 0;
- char *addr;
ENTRY;
- addr = kmap(page);
- LASSERT(PageLocked(page));
+ ll_check_dirty(inode->i_sb);
+
+ if (!PageLocked(page))
+ LBUG();
if (PageUptodate(page))
RETURN(0);
RETURN(0);
CDEBUG(D_VFSTRACE, "VFS Op\n");
- /* If are writing to a new page, no need to read old data. If we
- * haven't already gotten the file size in ll_file_write() since
- * we got our extent lock, we need to verify it here before we
- * overwrite some other node's write (bug 445).
- */
+ /* If are writing to a new page, no need to read old data.
+ * the extent locking and getattr procedures in ll_file_write have
+ * guaranteed that i_size is stable enough for our zeroing needs */
if (inode->i_size <= offset) {
- if (!S_ISBLK(inode->i_mode) && !(file->f_flags & O_APPEND)) {
- struct ll_file_data *fd = file->private_data;
- struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
-
- rc = ll_file_size(inode, lsm, fd->fd_ostdata);
- if (rc)
- GOTO(prepare_done, rc);
- }
- if (inode->i_size <= offset) {
- memset(addr, 0, PAGE_SIZE);
- GOTO(prepare_done, rc=0);
- }
+ memset(kmap(page), 0, PAGE_SIZE);
+ kunmap(page);
+ GOTO(prepare_done, rc = 0);
}
rc = ll_brw(OBD_BRW_READ, inode, page, 0);
EXIT;
prepare_done:
- if (!rc)
+ if (rc == 0)
SetPageUptodate(page);
- else
- kunmap (page);
return rc;
}
-/* Write a page from kupdated or kswapd.
+/*
+ * background file writeback. This is called regularly from kupdated to write
+ * dirty data, from kswapd when memory is low, and from filemap_fdatasync when
+ * super blocks or inodes are synced..
*
- * We unlock the page even in the face of an error, otherwise dirty
- * pages could OOM the system if they cannot be written. Also, there
- * is nobody to return an error code to from here - the application
- * may not even be running anymore.
+ * obd_brw errors down in _batch_writepage are ignored, so pages are always
+ * unlocked. Also, there is nobody to return an error code to from here - the
+ * application may not even be running anymore.
*
- * Returns the page unlocked, but with a reference.
+ * this should be async so that things like kswapd can have a chance to
+ * free some more pages that our allocating writeback may need, but it isn't
+ * yet.
*/
-static int ll_writepage(struct page *page) {
+static int ll_writepage(struct page *page)
+{
struct inode *inode = page->mapping->host;
- int err;
ENTRY;
- LASSERT(PageLocked(page));
-
- /* XXX need to make sure we have LDLM lock on this page */
+ CDEBUG(D_CACHE, "page %p [lau %d] inode %p\n", page,
+ PageLaunder(page), inode);
CDEBUG(D_VFSTRACE, "VFS Op\n");
- err = ll_brw(OBD_BRW_WRITE, inode, page, 1);
- if (err)
- CERROR("ll_brw failure %d\n", err);
- else
- set_page_clean(page);
+ LASSERT(PageLocked(page));
- unlock_page(page);
- RETURN(err);
+ /* XXX should obd_brw errors trickle up? */
+ ll_batch_writepage(inode, page);
+ RETURN(0);
}
-
-/* SYNCHRONOUS I/O to object storage for an inode -- object attr will be updated
- * too */
+/*
+ * we really don't want to start writeback here, we want to give callers some
+ * time to further dirty the pages before we write them out.
+ */
static int ll_commit_write(struct file *file, struct page *page,
unsigned from, unsigned to)
{
struct inode *inode = page->mapping->host;
- struct ll_inode_info *lli = ll_i2info(inode);
- struct lov_stripe_md *md = lli->lli_smd;
- struct brw_page pg;
- struct obd_brw_set *set;
- int rc, create = 1;
loff_t size;
ENTRY;
- pg.pg = page;
- pg.count = to;
- /* XXX make the starting offset "from" */
- pg.off = (((obd_off)page->index) << PAGE_SHIFT);
- pg.flag = create ? OBD_BRW_CREATE : 0;
-
- set = obd_brw_set_new();
- if (set == NULL)
- RETURN(-ENOMEM);
-
- SetPageUptodate(page);
-
- if (!PageLocked(page))
- LBUG();
+ LASSERT(inode == file->f_dentry->d_inode);
+ LASSERT(PageLocked(page));
CDEBUG(D_VFSTRACE, "VFS Op\n");
- CDEBUG(D_INODE, "commit_page writing (off "LPD64"), count %d\n",
- pg.off, pg.count);
+ CDEBUG(D_INODE, "inode %p is writing page %p from %d to %d at %lu\n",
+ inode, page, from, to, page->index);
- set->brw_callback = ll_brw_sync_wait;
- rc = obd_brw(OBD_BRW_WRITE, ll_i2obdconn(inode), md, 1, &pg, set, NULL);
- if (rc)
- CERROR("error from obd_brw: rc = %d\n", rc);
- else {
- rc = ll_brw_sync_wait(set, CB_PHASE_START);
- if (rc)
- CERROR("error from callback: rc = %d\n", rc);
- }
- obd_brw_set_free(set);
- kunmap(page);
+ /* to match full page case in prepare_write */
+ SetPageUptodate(page);
+ /* mark the page dirty, put it on mapping->dirty,
+ * mark the inode PAGES_DIRTY, put it on sb->dirty */
+ set_page_dirty(page);
- size = pg.off + pg.count;
- /* do NOT truncate when writing in the middle of a file */
+ /* this is matched by a hack in obdo_to_inode at the moment */
+ size = (((obd_off)page->index) << PAGE_SHIFT) + to;
if (size > inode->i_size)
inode->i_size = size;
- RETURN(rc);
+ RETURN(0);
} /* ll_commit_write */
#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
if (!lsm || !lsm->lsm_object_id)
RETURN(-ENOMEM);
+ if ((iobuf->offset & (blocksize - 1)) ||
+ (iobuf->length & (blocksize - 1)))
+ RETURN(-EINVAL);
+
+#if 0
/* XXX Keep here until we find ia64 problem, it crashes otherwise */
if (blocksize != PAGE_SIZE) {
CERROR("direct_IO blocksize != PAGE_SIZE\n");
RETURN(-EINVAL);
}
+#endif
set = obd_brw_set_new();
if (set == NULL)
OBD_ALLOC(pga, sizeof(*pga) * iobuf->nr_pages);
if (!pga) {
- obd_brw_set_free(set);
+ obd_brw_set_decref(set);
RETURN(-ENOMEM);
}
- CDEBUG(D_PAGE, "blocksize %u, blocknr %lu, iobuf %p: nr_pages %u, "
- "array_len %u, offset %u, length %u\n",
- blocksize, blocknr, iobuf, iobuf->nr_pages,
- iobuf->array_len, iobuf->offset, iobuf->length);
-
flags = (rw == WRITE ? OBD_BRW_CREATE : 0) /* | OBD_BRW_DIRECTIO */;
- offset = (blocknr << inode->i_blkbits) /* + iobuf->offset? */;
+ offset = (blocknr << inode->i_blkbits);
length = iobuf->length;
for (i = 0, length = iobuf->length; length > 0;
pga[i].count = min_t(int, PAGE_SIZE - (offset & ~PAGE_MASK),
length);
pga[i].flag = flags;
- CDEBUG(D_PAGE, "page %d (%p), offset "LPU64", count %u\n",
- i, pga[i].pg, pga[i].off, pga[i].count);
if (rw == READ) {
//POISON(kmap(iobuf->maplist[i]), 0xc5, PAGE_SIZE);
//kunmap(iobuf->maplist[i]);
if (rc)
CERROR("error from callback: rc = %d\n", rc);
}
- obd_brw_set_free(set);
+ obd_brw_set_decref(set);
if (rc == 0)
rc = iobuf->length;
}
#endif
-int ll_flush_inode_pages(struct inode * inode)
-{
- obd_count bufs_per_obdo = 0;
- obd_size *count = NULL;
- obd_off *offset = NULL;
- obd_flag *flags = NULL;
- int err = 0;
-
- ENTRY;
-
-#if (LINUX_VERSION_CODE <= KERNEL_VERSION(2,5,0))
- spin_lock(&pagecache_lock);
-
- spin_unlock(&pagecache_lock);
-#endif
-
-
- OBD_ALLOC(count, sizeof(*count) * bufs_per_obdo);
- OBD_ALLOC(offset, sizeof(*offset) * bufs_per_obdo);
- OBD_ALLOC(flags, sizeof(*flags) * bufs_per_obdo);
- if (!count || !offset || !flags)
- GOTO(out, err=-ENOMEM);
-
-#if 0
- for (i = 0 ; i < bufs_per_obdo ; i++) {
- count[i] = PAGE_SIZE;
- offset[i] = ((obd_off)(iobuf->maplist[i])->index) << PAGE_SHIFT;
- flags[i] = OBD_BRW_CREATE;
- }
-
- err = obd_brw(OBD_BRW_WRITE, ll_i2obdconn(inode),
- ll_i2info(inode)->lli_smd, bufs_per_obdo,
- iobuf->maplist, count, offset, flags, NULL, NULL);
- if (err == 0)
- err = bufs_per_obdo * 4096;
-#endif
- out:
- OBD_FREE(flags, sizeof(*flags) * bufs_per_obdo);
- OBD_FREE(count, sizeof(*count) * bufs_per_obdo);
- OBD_FREE(offset, sizeof(*offset) * bufs_per_obdo);
- RETURN(err);
-}
-
//#endif
-
struct address_space_operations ll_aops = {
readpage: ll_readpage,
#if (LINUX_VERSION_CODE <= KERNEL_VERSION(2,5,0))
ENTRY;
CDEBUG(D_SUPER, "option: %s, data %s\n", opt, data);
- if ( strncmp(opt, data, strlen(opt)) )
+ if (strncmp(opt, data, strlen(opt)))
RETURN(NULL);
- if ( (value = strchr(data, '=')) == NULL )
+ if ((value = strchr(data, '=')) == NULL)
RETURN(NULL);
value++;
OBD_ALLOC(retval, strlen(value) + 1);
- if ( !retval ) {
+ if (!retval) {
CERROR("out of memory!\n");
RETURN(NULL);
}
ENTRY;
CDEBUG(D_SUPER, "option: %s, data %s\n", opt, data);
- if ( strncmp(opt, data, strlen(opt)) )
+ if (strncmp(opt, data, strlen(opt)))
RETURN(0);
else
RETURN(fl);
this_char != NULL;
this_char = strtok (NULL, ",")) {
CDEBUG(D_SUPER, "this_char %s\n", this_char);
- if ( (!*ost && (*ost = ll_read_opt("osc", this_char)))||
- (!*mds && (*mds = ll_read_opt("mdc", this_char)))||
- (!(*flags & LL_SBI_NOLCK) && ((*flags) = (*flags) |
- ll_set_opt("nolock", this_char, LL_SBI_NOLCK))) )
+ if ((!*ost && (*ost = ll_read_opt("osc", this_char)))||
+ (!*mds && (*mds = ll_read_opt("mdc", this_char)))||
+ (!(*flags & LL_SBI_NOLCK) &&
+ ((*flags) = (*flags) |
+ ll_set_opt("nolock", this_char, LL_SBI_NOLCK))))
continue;
}
EXIT;
ENTRY;
if ((attr->ia_valid & ATTR_SIZE)) {
+ /* writeback uses inode->i_size to determine how far out
+ * its cached pages go. ll_truncate gets a PW lock, canceling
+ * our lock, _after_ it has updated i_size. this can confuse
+ * us into zero extending the file to the newly truncated
+ * size, and this has bad implications for a racing o_append.
+ * if we're extending our size we need to flush the pages
+ * with the correct i_size before vmtruncate stomps on
+ * the new i_size. again, this can only find pages to
+ * purge if the PW lock that generated them is still held.
+ */
+ if ( attr->ia_size > inode->i_size ) {
+ filemap_fdatasync(inode->i_mapping);
+ filemap_fdatawait(inode->i_mapping);
+ }
err = vmtruncate(inode, attr->ia_size);
if (err)
RETURN(err);
CDEBUG(D_VFSTRACE, "VFS Op\n");
sema_init(&lli->lli_open_sem, 1);
+ atomic_set(&lli->lli_open_count, 0);
+ lli->lli_flags = 0;
+ init_MUTEX(&lli->lli_getattr_sem);
+ spin_lock_init(&lli->lli_read_extent_lock);
+ INIT_LIST_HEAD(&lli->lli_read_extents);
LASSERT(!lli->lli_smd);
- /* core attributes first */
+ /* core attributes from the MDS first */
ll_update_inode(inode, body, lic ? lic->lic_lmm : NULL);
/* Get the authoritative file size */
if (lli->lli_smd && (inode->i_mode & S_IFREG)) {
- int rc;
+ struct ldlm_extent extent = {0, OBD_OBJECT_EOF};
+ struct lustre_handle lockh = {0, 0};
+ struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
+ ldlm_error_t rc;
+
LASSERT(lli->lli_smd->lsm_object_id != 0);
- rc = ll_file_size(inode, lli->lli_smd, NULL);
- if (rc) {
- CERROR("ll_file_size: %d\n", rc);
+
+ rc = ll_extent_lock(NULL, inode, lsm, LCK_PR, &extent, &lockh);
+ if (rc != ELDLM_OK && rc != ELDLM_LOCK_MATCHED) {
ll_clear_inode(inode);
make_bad_inode(inode);
+ } else {
+ ll_extent_unlock(NULL, inode, lsm, LCK_PR, &lockh);
}
}
ENTRY;
CDEBUG(D_SUPER, "option: %s, data %s\n", opt, data);
- if ( strncmp(opt, data, strlen(opt)) )
+ if (strncmp(opt, data, strlen(opt)))
RETURN(NULL);
- if ( (value = strchr(data, '=')) == NULL )
+ if ((value = strchr(data, '=')) == NULL)
RETURN(NULL);
value++;
OBD_ALLOC(retval, strlen(value) + 1);
- if ( !retval ) {
+ if (!retval) {
CERROR("out of memory!\n");
RETURN(NULL);
}
ENTRY;
CDEBUG(D_SUPER, "option: %s, data %s\n", opt, data);
- if ( strncmp(opt, data, strlen(opt)) )
+ if (strncmp(opt, data, strlen(opt)))
RETURN(0);
else
RETURN(fl);
while ((this_char = strsep (&opt_ptr, ",")) != NULL) {
CDEBUG(D_SUPER, "this_char %s\n", this_char);
- if ( (!*ost && (*ost = ll_read_opt("osc", this_char)))||
- (!*mds && (*mds = ll_read_opt("mdc", this_char)))||
- (!(*flags & LL_SBI_NOLCK) && ((*flags) = (*flags) |
- ll_set_opt("nolock", this_char, LL_SBI_NOLCK))) )
+ if ((!*ost && (*ost = ll_read_opt("osc", this_char)))||
+ (!*mds && (*mds = ll_read_opt("mdc", this_char)))||
+ (!(*flags & LL_SBI_NOLCK) &&
+ ((*flags) = (*flags) |
+ ll_set_opt("nolock", this_char, LL_SBI_NOLCK))))
continue;
}
EXIT;
ENTRY;
sema_init(&lli->lli_open_sem, 1);
+ lli->flags = 0;
+ init_MUTEX(&lli->lli_getattr_sem);
+ /* these are 2.4 only, but putting them here for consistency.. */
+ spin_lock_init(&lli->lli_read_extent_lock);
+ INIT_LIST_HEAD(&lli->lli_read_extents);
LASSERT(!lli->lli_smd);
/* Get the authoritative file size */
if (lli->lli_smd && S_ISREG(inode->i_mode)) {
- rc = ll_file_size(inode, lli->lli_smd, NULL);
- if (rc) {
- CERROR("ll_file_size: %d\n", rc);
+ struct ll_file_data *fd = file->private_data;
+ struct ldlm_extent extent = {0, OBD_OBJECT_EOF};
+ struct lustre_handle lockh = {0, 0};
+
+ LASSERT(lli->lli_smd->lsm_object_id != 0);
+
+ rc = ll_extent_lock(fd, inode, lsm, LCK_PR, &extent, &lockh);
+ if (err != ELDLM_OK && err != ELDLM_MATCHED) {
ll_clear_inode(inode);
make_bad_inode(inode);
- RETURN(rc);
+ } else {
+ l_extent_unlock(fd, inode, lsm, LCK_PR, &extent,
+ &lockh);
}
}
memset(lli, 0, (char *)&lli->lli_vfs_inode - (char *)lli);
sema_init(&lli->lli_open_sem, 1);
+ init_MUTEX(&lli->lli_size_valid_sem);
return &lli->lli_vfs_inode;
}
struct lov_obd *lov;
struct lov_oinfo *loi;
struct lov_stripe_md submd;
- int rc = 0, i;
+ ldlm_error_t rc = ELDLM_LOCK_MATCHED, err;
+ int i;
ENTRY;
if (!lsm) {
submd.lsm_stripe_count = 0;
/* XXX submd is not fully initialized here */
*flags = 0;
- rc = obd_enqueue(&(lov->tgts[loi->loi_ost_idx].conn), &submd,
- parent_lock, type, &sub_ext, sizeof(sub_ext),
- mode, flags, cb, data, datalen, lov_lockhp);
+ err = obd_enqueue(&(lov->tgts[loi->loi_ost_idx].conn), &submd,
+ parent_lock, type, &sub_ext, sizeof(sub_ext),
+ mode, flags, cb, data, datalen, lov_lockhp);
+
// XXX add a lock debug statement here
- if (rc)
+ /* return _MATCHED only when all locks matched.. */
+ if (err == ELDLM_OK) {
+ rc = ELDLM_OK;
+ } else if (err != ELDLM_LOCK_MATCHED) {
+ rc = err;
memset(lov_lockhp, 0, sizeof(*lov_lockhp));
- if (rc && lov->tgts[loi->loi_ost_idx].active) {
- CERROR("error: enqueue objid "LPX64" subobj "LPX64
- " on OST idx %d: rc = %d\n", lsm->lsm_object_id,
- loi->loi_id, loi->loi_ost_idx, rc);
- goto out_locks;
+ if (lov->tgts[loi->loi_ost_idx].active) {
+ CERROR("error: enqueue objid "LPX64" subobj "
+ LPX64" on OST idx %d: rc = %d\n",
+ lsm->lsm_object_id, loi->loi_id,
+ loi->loi_ost_idx, rc);
+ goto out_locks;
+ }
}
}
- RETURN(0);
+ RETURN(rc);
out_locks:
while (loi--, lov_lockhp--, i-- > 0) {
lov = &export->exp_obd->u.lov;
for (i = 0, loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count;
- i++, loi++, lov_lockhp++ ) {
+ i++, loi++, lov_lockhp++) {
struct lov_stripe_md submd;
int err;
__u64 last_transno = 0;
__u64 last_mount;
int rc = 0;
+
+ LASSERT(sizeof(struct mds_client_data) == MDS_LR_SIZE);
+ LASSERT(sizeof(struct mds_server_data) <= MDS_LR_CLIENT);
OBD_ALLOC(msd, sizeof(*msd));
if (!msd)
struct obd_class_user_state *ocus;
ENTRY;
- OBD_ALLOC (ocus, sizeof (*ocus));
+ OBD_ALLOC(ocus, sizeof(*ocus));
if (ocus == NULL)
return (-ENOMEM);
- INIT_LIST_HEAD (&ocus->ocus_conns);
- ocus->ocus_current_obd = NULL;
+ INIT_LIST_HEAD(&ocus->ocus_conns);
file->private_data = ocus;
MOD_INC_USE_COUNT;
int err = 0, len = 0, serialised = 0;
ENTRY;
+ if ((cmd & 0xffffff00) == ((int)'T') << 8) /* ignore all tty ioctls */
+ RETURN(err = -ENOTTY);
+
switch (cmd) {
case OBD_IOC_BRW_WRITE:
case OBD_IOC_BRW_READ:
break;
}
- if (!obd && cmd != OBD_IOC_DEVICE && cmd != TCGETS &&
+ CDEBUG(D_IOCTL, "cmd = %x, obd = %p\n", cmd, obd);
+ if (!obd && cmd != OBD_IOC_DEVICE &&
cmd != OBD_IOC_LIST && cmd != OBD_GET_VERSION &&
cmd != OBD_IOC_NAME2DEV && cmd != OBD_IOC_NEWDEV &&
cmd != OBD_IOC_ADD_UUID && cmd != OBD_IOC_DEL_UUID &&
data = (struct obd_ioctl_data *)buf;
switch (cmd) {
- case TCGETS:
- GOTO(out, err=-EINVAL);
case OBD_IOC_DEVICE: {
CDEBUG(D_IOCTL, "\n");
if (data->ioc_dev >= MAX_OBD_DEVICES || data->ioc_dev < 0) {
int l;
char *status;
struct obd_device *obd = &obd_dev[i];
+
if (!obd->obd_type)
continue;
if (obd->obd_flags & OBD_SET_UP)
#define OBD_MINOR 241
#ifdef __KERNEL__
/* to control /dev/obd */
-static int obd_class_ioctl (struct inode * inode, struct file * filp,
- unsigned int cmd, unsigned long arg)
+static int obd_class_ioctl(struct inode *inode, struct file *filp,
+ unsigned int cmd, unsigned long arg)
{
return class_handle_ioctl(filp->private_data, cmd, arg);
}
/* declare character device */
static struct file_operations obd_psdev_fops = {
- ioctl: obd_class_ioctl, /* ioctl */
- open: obd_class_open, /* open */
- release: obd_class_release, /* release */
+ ioctl: obd_class_ioctl, /* ioctl */
+ open: obd_class_open, /* open */
+ release: obd_class_release, /* release */
};
/* modules setup */
if (count == 1)
atomic_dec(&obd_kmap_count);
else while (atomic_add_negative(-count, &obd_kmap_count)) {
+ struct l_wait_info lwi = { 0 };
static long next_show = 0;
static int skipped = 0;
skipped = 0;
} else
skipped++;
- wait_event(obd_kmap_waitq,
- atomic_read(&obd_kmap_count) >= count);
+ l_wait_event(obd_kmap_waitq,
+ atomic_read(&obd_kmap_count) >= count, &lwi);
}
}
type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
vars, type);
- if (type->typ_procroot && IS_ERR(type->typ_procroot)) {
+ if (IS_ERR(type->typ_procroot)) {
rc = PTR_ERR(type->typ_procroot);
type->typ_procroot = NULL;
list_del(&type->typ_chain);
struct obd_export *class_new_export(struct obd_device *obddev)
{
- struct obd_export * export;
+ struct obd_export *export;
- export = kmem_cache_alloc(export_cachep, GFP_KERNEL);
+ PORTAL_SLAB_ALLOC(export, export_cachep, sizeof(*export));
if (!export) {
CERROR("no memory! (minor %d)\n", obddev->obd_minor);
return NULL;
}
- memset(export, 0, sizeof(*export));
get_random_bytes(&export->exp_cookie, sizeof(export->exp_cookie));
export->exp_obd = obddev;
/* XXX this should be in LDLM init */
ptlrpc_abort_inflight_superhack(&exp->exp_ldlm_data.led_import,
1);
- exp->exp_cookie = DEAD_HANDLE_MAGIC;
- kmem_cache_free(export_cachep, exp);
+ PORTAL_SLAB_FREE(exp, export_cachep, sizeof(*exp));
}
/* a connection defines an export context in which preallocation can
}
OBD_FREE(pga, npages * sizeof(*pga));
out_0:
- obd_brw_set_free(set);
+ obd_brw_set_decref(set);
return (rc);
}
out_1:
OBD_FREE(pga, npages * sizeof(*pga));
out_0:
- obd_brw_set_free(set);
+ obd_brw_set_decref(set);
return (rc);
}
#else
# This code is issued under the GNU General Public License.
# See the file COPYING in this distribution
-DEFS = $(ENABLE_OST_RECOVERY)
MODULE = obdfilter
modulefs_DATA = obdfilter.o
EXTRA_PROGRAMS = obdfilter
#endif
if (written == sizeof(*fcd))
RETURN(0);
- CERROR("error writing to last_rcvd file: rc = %d\n", written);
+ CERROR("error writing to last_rcvd file: rc = %d\n", (int)written);
if (written >= 0)
RETURN(-EIO);
struct filter_client_data *fcd = NULL;
struct inode *inode = filp->f_dentry->d_inode;
unsigned long last_rcvd_size = inode->i_size;
- __u64 mount_count;
+ __u64 mount_count = 0;
int cl_idx;
loff_t off = 0;
int rc;
ssize_t retval = lustre_fread(filp, (char *)fsd, sizeof(*fsd),
&off);
if (retval != sizeof(*fsd)) {
- CDEBUG(D_INODE,"OBD filter: error reading lastobjid\n");
- GOTO(out, rc = -EIO);
+ CDEBUG(D_INODE,"OBD filter: error reading %s\n",
+ LAST_RCVD);
+ GOTO(err_fsd, rc = -EIO);
}
mount_count = le64_to_cpu(fsd->fsd_mount_count);
filter->fo_subdir_count = le16_to_cpu(fsd->fsd_subdir_count);
if (fsd->fsd_feature_incompat) {
CERROR("unsupported feature %x\n",
le32_to_cpu(fsd->fsd_feature_incompat));
- RETURN(-EINVAL);
+ GOTO(err_fsd, rc = -EINVAL);
}
if (fsd->fsd_feature_rocompat) {
CERROR("read-only feature %x\n",
le32_to_cpu(fsd->fsd_feature_rocompat));
/* Do something like remount filesystem read-only */
- RETURN(-EINVAL);
+ GOTO(err_fsd, rc = -EINVAL);
}
CDEBUG(D_INODE, "%s: server last_objid: "LPU64"\n",
* the header. If we find clients with higher last_rcvd values
* then those clients may need recovery done.
*/
- if (obd->obd_flags & OBD_REPLAYABLE) {
- for (cl_idx = 0; off < last_rcvd_size; cl_idx++) {
- __u64 last_rcvd;
- int mount_age;
-
- if (!fcd) {
- OBD_ALLOC(fcd, sizeof(*fcd));
- if (!fcd)
- GOTO(err_fsd, rc = -ENOMEM);
- }
+ if (!(obd->obd_flags & OBD_REPLAYABLE)) {
+ CERROR("%s: recovery support OFF\n", obd->obd_name);
+ GOTO(out, rc = 0);
+ }
- /* Don't assume off is incremented properly, in case
- * sizeof(fsd) isn't the same as fsd->fsd_client_size.
- */
- off = le32_to_cpu(fsd->fsd_client_start) +
- cl_idx * le16_to_cpu(fsd->fsd_client_size);
- rc = lustre_fread(filp, (char *)fcd, sizeof(*fcd), &off);
- if (rc != sizeof(*fcd)) {
- CERROR("error reading FILTER %s offset %d: rc = %d\n",
- LAST_RCVD, cl_idx, rc);
- if (rc > 0) /* XXX fatal error or just abort reading? */
- rc = -EIO;
- break;
- }
+ for (cl_idx = 0; off < last_rcvd_size; cl_idx++) {
+ __u64 last_rcvd;
+ int mount_age;
- if (fcd->fcd_uuid[0] == '\0') {
- CDEBUG(D_INFO, "skipping zeroed client at offset %d\n",
- cl_idx);
- continue;
- }
+ if (!fcd) {
+ OBD_ALLOC(fcd, sizeof(*fcd));
+ if (!fcd)
+ GOTO(err_fsd, rc = -ENOMEM);
+ }
- last_rcvd = le64_to_cpu(fcd->fcd_last_rcvd);
+ /* Don't assume off is incremented properly, in case
+ * sizeof(fsd) isn't the same as fsd->fsd_client_size.
+ */
+ off = le32_to_cpu(fsd->fsd_client_start) +
+ cl_idx * le16_to_cpu(fsd->fsd_client_size);
+ rc = lustre_fread(filp, (char *)fcd, sizeof(*fcd), &off);
+ if (rc != sizeof(*fcd)) {
+ CERROR("error reading FILTER %s offset %d: rc = %d\n",
+ LAST_RCVD, cl_idx, rc);
+ if (rc > 0) /* XXX fatal error or just abort reading? */
+ rc = -EIO;
+ break;
+ }
- /* These exports are cleaned up by filter_disconnect(), so they
- * need to be set up like real exports as filter_connect() does.
- */
- mount_age = mount_count - le64_to_cpu(fcd->fcd_mount_count);
- if (mount_age < FILTER_MOUNT_RECOV) {
- struct obd_export *exp = class_new_export(obd);
- struct filter_export_data *fed;
- CERROR("RCVRNG CLIENT uuid: %s idx: %d lr: "LPU64
- " srv lr: "LPU64" mnt: "LPU64" last mount: "
- LPU64"\n", fcd->fcd_uuid, cl_idx,
- last_rcvd, le64_to_cpu(fsd->fsd_last_rcvd),
- le64_to_cpu(fcd->fcd_mount_count), mount_count);
- /* disabled until OST recovery is actually working */
-
- if (!exp) {
- rc = -ENOMEM;
- break;
- }
- memcpy(&exp->exp_client_uuid.uuid, fcd->fcd_uuid,
- sizeof exp->exp_client_uuid.uuid);
- fed = &exp->exp_filter_data;
- fed->fed_fcd = fcd;
- filter_client_add(filter, fed, cl_idx);
- /* create helper if export init gets more complex */
- INIT_LIST_HEAD(&fed->fed_open_head);
- spin_lock_init(&fed->fed_lock);
-
- fcd = NULL;
- obd->obd_recoverable_clients++;
- } else {
- CDEBUG(D_INFO,
- "discarded client %d UUID '%s' count "LPU64"\n",
- cl_idx, fcd->fcd_uuid,
- le64_to_cpu(fcd->fcd_mount_count));
- }
+ if (fcd->fcd_uuid[0] == '\0') {
+ CDEBUG(D_INFO, "skipping zeroed client at offset %d\n",
+ cl_idx);
+ continue;
+ }
- CDEBUG(D_OTHER, "client at idx %d has last_rcvd = "LPU64"\n",
- cl_idx, last_rcvd);
+ last_rcvd = le64_to_cpu(fcd->fcd_last_rcvd);
- if (last_rcvd > le64_to_cpu(filter->fo_fsd->fsd_last_rcvd))
- filter->fo_fsd->fsd_last_rcvd = cpu_to_le64(last_rcvd);
+ /* These exports are cleaned up by filter_disconnect(), so they
+ * need to be set up like real exports as filter_connect() does.
+ */
+ mount_age = mount_count - le64_to_cpu(fcd->fcd_mount_count);
+ if (mount_age < FILTER_MOUNT_RECOV) {
+ struct obd_export *exp = class_new_export(obd);
+ struct filter_export_data *fed;
+ CERROR("RCVRNG CLIENT uuid: %s idx: %d lr: "LPU64
+ " srv lr: "LPU64" mnt: "LPU64" last mount: "
+ LPU64"\n", fcd->fcd_uuid, cl_idx,
+ last_rcvd, le64_to_cpu(fsd->fsd_last_rcvd),
+ le64_to_cpu(fcd->fcd_mount_count), mount_count);
+ /* disabled until OST recovery is actually working */
+
+ if (!exp) {
+ rc = -ENOMEM;
+ break;
+ }
+ memcpy(&exp->exp_client_uuid.uuid, fcd->fcd_uuid,
+ sizeof exp->exp_client_uuid.uuid);
+ fed = &exp->exp_filter_data;
+ fed->fed_fcd = fcd;
+ filter_client_add(filter, fed, cl_idx);
+ /* create helper if export init gets more complex */
+ INIT_LIST_HEAD(&fed->fed_open_head);
+ spin_lock_init(&fed->fed_lock);
+
+ fcd = NULL;
+ obd->obd_recoverable_clients++;
+ } else {
+ CDEBUG(D_INFO,
+ "discarded client %d UUID '%s' count "LPU64"\n",
+ cl_idx, fcd->fcd_uuid,
+ le64_to_cpu(fcd->fcd_mount_count));
}
+ CDEBUG(D_OTHER, "client at idx %d has last_rcvd = "LPU64"\n",
+ cl_idx, last_rcvd);
+
+ if (last_rcvd > le64_to_cpu(filter->fo_fsd->fsd_last_rcvd))
+ filter->fo_fsd->fsd_last_rcvd = cpu_to_le64(last_rcvd);
+
obd->obd_last_committed = le64_to_cpu(filter->fo_fsd->fsd_last_rcvd);
if (obd->obd_recoverable_clients) {
- CERROR("RECOVERY: %d recoverable clients, last_rcvd "LPU64"\n",
- obd->obd_recoverable_clients,
+ CERROR("RECOVERY: %d recoverable clients, last_rcvd "
+ LPU64"\n", obd->obd_recoverable_clients,
le64_to_cpu(filter->fo_fsd->fsd_last_rcvd));
obd->obd_next_recovery_transno = obd->obd_last_committed + 1;
obd->obd_flags |= OBD_RECOVERING;
if (fcd)
OBD_FREE(fcd, sizeof(*fcd));
- } else {
- CERROR("%s: recovery support OFF\n", obd->obd_name);
}
+out:
fsd->fsd_mount_count = cpu_to_le64(mount_count + 1);
/* save it,so mount count and last_recvd is current */
rc = filter_update_server_data(filp, filter->fo_fsd);
-out:
RETURN(rc);
err_fsd:
if (filter->fo_subdir_count) {
O_dentry = filter->fo_dentry_O_mode[S_IFREG >> S_SHIFT];
OBD_ALLOC(filter->fo_dentry_O_sub,
- FILTER_SUBDIR_COUNT * sizeof(dentry));
+ filter->fo_subdir_count * sizeof(dentry));
if (!filter->fo_dentry_O_sub)
GOTO(err_client, rc = -ENOMEM);
mnt = do_kern_mount(data->ioc_inlbuf2, 0, data->ioc_inlbuf1, option);
rc = PTR_ERR(mnt);
- if (IS_ERR(mnt)) {
- CERROR("mount of %s as type %s failed: rc %d\n",
- data->ioc_inlbuf2, data->ioc_inlbuf1, rc);
+ if (IS_ERR(mnt))
GOTO(err_ops, rc);
- }
#if OST_RECOVERY
obd->obd_flags |= OBD_REPLAYABLE;
#endif
- filter = &obd->u.filter;;
+ filter = &obd->u.filter;
filter->fo_vfsmnt = mnt;
filter->fo_fstype = strdup(data->ioc_inlbuf2);
filter->fo_sb = mnt->mnt_root->d_inode->i_sb;
RETURN(rc);
exp = class_conn2export(conn);
LASSERT(exp);
+
fed = &exp->exp_filter_data;
+ INIT_LIST_HEAD(&exp->exp_filter_data.fed_open_head);
+ spin_lock_init(&exp->exp_filter_data.fed_lock);
+
+ if (!(obd->obd_flags & OBD_REPLAYABLE))
+ RETURN(0);
+
OBD_ALLOC(fcd, sizeof(*fcd));
if (!fcd) {
CERROR("filter: out of memory for client data\n");
fed->fed_fcd = fcd;
fcd->fcd_mount_count = cpu_to_le64(filter->fo_fsd->fsd_mount_count);
- INIT_LIST_HEAD(&exp->exp_filter_data.fed_open_head);
- spin_lock_init(&exp->exp_filter_data.fed_lock);
-
- if (obd->obd_flags & OBD_REPLAYABLE) {
- rc = filter_client_add(filter, fed, -1);
- if (rc)
- GOTO(out_fcd, rc);
- }
+ rc = filter_client_add(filter, fed, -1);
+ if (rc)
+ GOTO(out_fcd, rc);
RETURN(rc);
ldlm_cancel_locks_for_export(exp);
- if (exp->exp_obd->obd_flags & OBD_REPLAYABLE)
+ if (exp->exp_obd->obd_flags & OBD_REPLAYABLE)
filter_client_free(exp);
rc = class_disconnect(conn);
/* This would only happen if lastobjid was bad on disk */
CERROR("objid %s already exists\n",
- filter_id(buf, filter, S_IFREG, oa->o_id));
+ filter_id(buf, filter, oa->o_mode, oa->o_id));
LBUG();
GOTO(out, rc = -EEXIST);
}
/* This page is currently locked, so get a temporary page instead. */
- /* XXX I believe this is a very dangerous thing to do - consider if
- * we had multiple writers for the same file (definitely the case
- * if we are using this codepath). If writer A locks the page,
- * writer B writes to a copy (as here), writer A drops the page
- * lock, and writer C grabs the lock before B does, then B will
- * later overwrite the data from C, even if C had LDLM locked
- * and initiated the write after B did.
- */
if (!page) {
unsigned long addr;
CDEBUG(D_ERROR,"ino %lu page %ld locked\n", inode->i_ino,index);
o->ioo_id),
o->ioo_id, 0);
- if (IS_ERR(dentry))
+ if (IS_ERR(dentry))
GOTO(out_objinfo, rc = PTR_ERR(dentry));
fso[i].fso_dentry = dentry;
for (i = 0; i < objcount; i++, o++) {
struct dentry *dentry;
struct inode *inode;
+ int (*fs_bmap)(struct address_space *, long);
int j;
dentry = filter_fid2dentry(obd, filter_parent(obd, S_IFREG,
f_dput(dentry);
GOTO(out, rc = -ENOENT);
}
+ fs_bmap = inode->i_mapping->a_ops->bmap;
for (j = 0; j < o->ioo_bufcnt; j++, rnb++) {
long block;
- block = rnb->offset >> PAGE_SHIFT;
+ block = rnb->offset >> inode->i_blkbits;
if (cmd == OBD_BRW_READ) {
- block = inode->i_mapping->a_ops->bmap(
- inode->i_mapping, block);
+ block = fs_bmap(inode->i_mapping, block);
} else {
loff_t newsize = rnb->offset + rnb->len;
/* fs_prep_san_write will also update inode
unsigned long index = 0;
int err = 0;
+ LBUG(); /* THIS CODE IS NOT CORRECT -phil */
+
memset(&srcmd, 0, sizeof(srcmd));
memset(&dstmd, 0, sizeof(dstmd));
srcmd.lsm_object_id = src->o_id;
page->index = index;
set->brw_callback = ll_brw_sync_wait;
err = obd_brw(OBD_BRW_READ, src_conn, &srcmd, 1, &pg, set,NULL);
- obd_brw_set_free(set);
+ obd_brw_set_decref(set);
if (err) {
EXIT;
break;
set->brw_callback = ll_brw_sync_wait;
err = obd_brw(OBD_BRW_WRITE, dst_conn, &dstmd, 1, &pg, set,oti);
- obd_brw_set_free(set);
+ obd_brw_set_decref(set);
/* XXX should handle dst->o_size, dst->o_blocks here */
if (err) {
LASSERT(desc->bd_brw_set != NULL);
LASSERT(desc->bd_brw_set->brw_callback != NULL);
+ /* It's important that you don't use desc->bd_brw_set after this
+ * callback runs. If you do, take a reference on it. */
desc->bd_brw_set->brw_callback(desc->bd_brw_set, CB_PHASE_FINISH);
/* We can't kunmap the desc from interrupt context, so we do it from
LASSERT(desc->bd_brw_set != NULL);
- ptlrpc_abort_bulk(desc);
+ /* XXX reconcile this with ll_sync_brw_timeout() handling, and/or
+ * just make osc_ptl_ev_hdlr() check desc->bd_flags for either
+ * PTL_BULK_FL_RCVD or PTL_BULK_FL_SENT, and pass CB_PHASE_ABORT
+ * to brw_callback() and do the rest of the cleanup there. I
+ * also think ll_sync_brw_timeout() is missing an PtlMEUnlink,
+ * but I could be wrong.
+ */
+ if (ptlrpc_abort_bulk(desc)) {
+ EXIT;
+ return;
+ }
obd_brw_set_del(desc);
unmap_and_decref_bulk_desc(desc);
struct ost_body *body;
int rc, size[3] = {sizeof(*body)}, mapped = 0;
struct obd_ioobj *iooptr;
- void *nioptr;
+ struct niobuf_remote *nioptr;
__u32 xid;
ENTRY;
iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
- ost_pack_ioo(&iooptr, lsm, page_count);
+ ost_pack_ioo(iooptr, lsm, page_count);
/* end almost identical to brw_write case */
xid = ptlrpc_next_xid(); /* single xid for all pages */
obd_kmap_get(page_count, 0);
- for (mapped = 0; mapped < page_count; mapped++) {
+ for (mapped = 0; mapped < page_count; mapped++, nioptr++) {
struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
if (bulk == NULL) {
unmap_and_decref_bulk_desc(desc);
GOTO(out_req, rc = -ENOMEM);
}
- bulk->bp_xid = xid; /* single xid for all pages */
+ LASSERT(mapped == 0 || pga[mapped].off > pga[mapped - 1].off);
+ bulk->bp_xid = xid; /* single xid for all pages */
bulk->bp_buf = kmap(pga[mapped].pg);
bulk->bp_page = pga[mapped].pg;
bulk->bp_buflen = PAGE_SIZE;
- ost_pack_niobuf(&nioptr, pga[mapped].off, pga[mapped].count,
+ ost_pack_niobuf(nioptr, pga[mapped].off, pga[mapped].count,
pga[mapped].flag, bulk->bp_xid);
}
struct ost_body *body;
int rc, size[3] = {sizeof(*body)}, mapped = 0;
struct obd_ioobj *iooptr;
- void *nioptr;
+ struct niobuf_remote *nioptr;
__u32 xid;
#if CHECKSUM_BULK
__u64 cksum = 0;
iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
- ost_pack_ioo(&iooptr, lsm, page_count);
+ ost_pack_ioo(iooptr, lsm, page_count);
/* end almost identical to brw_read case */
xid = ptlrpc_next_xid(); /* single xid for all pages */
obd_kmap_get(page_count, 0);
- for (mapped = 0; mapped < page_count; mapped++) {
+ for (mapped = 0; mapped < page_count; mapped++, nioptr++) {
struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
if (bulk == NULL) {
unmap_and_decref_bulk_desc(desc);
GOTO(out_req, rc = -ENOMEM);
}
- bulk->bp_xid = xid; /* single xid for all pages */
+ LASSERT(mapped == 0 || pga[mapped].off > pga[mapped - 1].off);
+ bulk->bp_xid = xid; /* single xid for all pages */
bulk->bp_buf = kmap(pga[mapped].pg);
bulk->bp_page = pga[mapped].pg;
+ /* matching ptlrpc_bulk_get assert */
+ LASSERT(pga[mapped].count > 0);
bulk->bp_buflen = pga[mapped].count;
- ost_pack_niobuf(&nioptr, pga[mapped].off, pga[mapped].count,
+ ost_pack_niobuf(nioptr, pga[mapped].off, pga[mapped].count,
pga[mapped].flag, bulk->bp_xid);
ost_checksum(&cksum, bulk->bp_buf, bulk->bp_buflen);
}
#define OSC_BRW_MAX_SIZE 65536
#define OSC_BRW_MAX_IOV min_t(int, PTL_MD_MAX_IOV, OSC_BRW_MAX_SIZE/PAGE_SIZE)
+#warning "FIXME: make these values dynamic based on a get_info call at setup"
+#define OSC_BRW_MAX_SIZE 65536
+#define OSC_BRW_MAX_IOV min_t(int, PTL_MD_MAX_IOV, OSC_BRW_MAX_SIZE/PAGE_SIZE)
+
static int osc_brw(int cmd, struct lustre_handle *conn,
struct lov_stripe_md *md, obd_count page_count,
struct brw_page *pga, struct obd_brw_set *set,
/* Note: caller will lock/unlock, and set uptodate on the pages */
#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
static int sanosc_brw_read(struct lustre_handle *conn,
- struct lov_stripe_md *md,
+ struct lov_stripe_md *lsm,
obd_count page_count,
struct brw_page *pga,
struct obd_brw_set *set)
{
struct ptlrpc_request *request = NULL;
struct ost_body *body;
- struct niobuf_remote *remote, *nio_rep;
- int rc, j, size[3] = {sizeof(*body)}, mapped = 0;
+ struct niobuf_remote *nioptr;
struct obd_ioobj *iooptr;
- void *nioptr;
+ int rc, j, size[3] = {sizeof(*body)}, mapped = 0;
ENTRY;
size[1] = sizeof(struct obd_ioobj);
- size[2] = page_count * sizeof(*remote);
+ size[2] = page_count * sizeof(*nioptr);
request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_SAN_READ, 3,
size, NULL);
body = lustre_msg_buf(request->rq_reqmsg, 0);
iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
- ost_pack_ioo(&iooptr, md, page_count);
+ ost_pack_ioo(iooptr, lsm, page_count);
obd_kmap_get(page_count, 0);
- for (mapped = 0; mapped < page_count; mapped++) {
+ for (mapped = 0; mapped < page_count; mapped++, nioptr++) {
LASSERT(PageLocked(pga[mapped].pg));
+ LASSERT(mapped == 0 || pga[mapped].off > pga[mapped - 1].off);
kmap(pga[mapped].pg);
- ost_pack_niobuf(&nioptr, pga[mapped].off, pga[mapped].count,
+ ost_pack_niobuf(nioptr, pga[mapped].off, pga[mapped].count,
pga[mapped].flag, 0);
}
- size[1] = page_count * sizeof(*remote);
+ size[1] = page_count * sizeof(*nioptr);
request->rq_replen = lustre_msg_size(2, size);
rc = ptlrpc_queue_wait(request);
GOTO(out_unmap, rc = -EINVAL);
}
- for (j = 0; j < page_count; j++) {
- ost_unpack_niobuf(&nioptr, &remote);
- }
-
- nioptr = lustre_msg_buf(request->rq_repmsg, 1);
- nio_rep = (struct niobuf_remote*)nioptr;
-
/* actual read */
- for (j = 0; j < page_count; j++) {
+ for (j = 0; j < page_count; j++, nioptr++) {
struct page *page = pga[j].pg;
struct buffer_head *bh;
kdev_t dev;
+ ost_unpack_niobuf(nioptr, nioptr);
/* got san device associated */
LASSERT(class_conn2obd(conn));
dev = class_conn2obd(conn)->u.cli.cl_sandev;
/* hole */
- if (!nio_rep[j].offset) {
+ if (!nioptr->offset) {
CDEBUG(D_PAGE, "hole at ino %lu; index %ld\n",
page->mapping->host->i_ino,
page->index);
clear_bit(BH_New, &bh->b_state);
set_bit(BH_Mapped, &bh->b_state);
- bh->b_blocknr = (unsigned long)nio_rep[j].offset;
+ bh->b_blocknr = (unsigned long)nioptr->offset;
clear_bit(BH_Uptodate, &bh->b_state);
* one we mapped before, check it */
LASSERT(!test_bit(BH_New, &bh->b_state));
LASSERT(test_bit(BH_Mapped, &bh->b_state));
- LASSERT(bh->b_blocknr ==
- (unsigned long)nio_rep[j].offset);
+ LASSERT(bh->b_blocknr == (unsigned long)nioptr->offset);
/* wait it's io completion */
if (test_bit(BH_Lock, &bh->b_state))
}
static int sanosc_brw_write(struct lustre_handle *conn,
- struct lov_stripe_md *md,
+ struct lov_stripe_md *lsm,
obd_count page_count,
struct brw_page *pga,
struct obd_brw_set *set)
{
struct ptlrpc_request *request = NULL;
struct ost_body *body;
- struct niobuf_remote *remote, *nio_rep;
- int rc, j, size[3] = {sizeof(*body)}, mapped = 0;
+ struct niobuf_remote *nioptr;
struct obd_ioobj *iooptr;
- void *nioptr;
+ int rc, j, size[3] = {sizeof(*body)}, mapped = 0;
ENTRY;
size[1] = sizeof(struct obd_ioobj);
- size[2] = page_count * sizeof(*remote);
+ size[2] = page_count * sizeof(*nioptr);
request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_SAN_WRITE,
3, size, NULL);
body = lustre_msg_buf(request->rq_reqmsg, 0);
iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
- ost_pack_ioo(&iooptr, md, page_count);
+ ost_pack_ioo(iooptr, lsm, page_count);
/* map pages, and pack request */
obd_kmap_get(page_count, 0);
- for (mapped = 0; mapped < page_count; mapped++) {
+ for (mapped = 0; mapped < page_count; mapped++, nioptr++) {
LASSERT(PageLocked(pga[mapped].pg));
+ LASSERT(mapped == 0 || pga[mapped].off > pga[mapped - 1].off);
kmap(pga[mapped].pg);
- ost_pack_niobuf(&nioptr, pga[mapped].off, pga[mapped].count,
+ ost_pack_niobuf(nioptr, pga[mapped].off, pga[mapped].count,
pga[mapped].flag, 0);
}
- size[1] = page_count * sizeof(*remote);
+ size[1] = page_count * sizeof(*nioptr);
request->rq_replen = lustre_msg_size(2, size);
rc = ptlrpc_queue_wait(request);
GOTO(out_unmap, rc = -EINVAL);
}
- for (j = 0; j < page_count; j++) {
- ost_unpack_niobuf(&nioptr, &remote);
- }
-
- nioptr = lustre_msg_buf(request->rq_repmsg, 1);
- nio_rep = (struct niobuf_remote*)nioptr;
-
/* actual write */
- for (j = 0; j < page_count; j++) {
+ for (j = 0; j < page_count; j++, nioptr++) {
struct page *page = pga[j].pg;
struct buffer_head *bh;
kdev_t dev;
+ ost_unpack_niobuf(nioptr, nioptr);
/* got san device associated */
LASSERT(class_conn2obd(conn));
dev = class_conn2obd(conn)->u.cli.cl_sandev;
LASSERT(!test_bit(BH_New, &page->buffers->b_state));
LASSERT(test_bit(BH_Mapped, &page->buffers->b_state));
LASSERT(page->buffers->b_blocknr ==
- (unsigned long)nio_rep[j].offset);
+ (unsigned long)nioptr->offset);
}
bh = page->buffers;
set_bit(BH_Mapped, &bh->b_state);
/* override the block nr */
- bh->b_blocknr = (unsigned long)nio_rep[j].offset;
+ bh->b_blocknr = (unsigned long)nioptr->offset;
/* we are about to write it, so set it
* uptodate/dirty
goto out_req;
}
-#else
-static int sanosc_brw_read(struct lustre_handle *conn,
- struct lov_stripe_md *md,
- obd_count page_count,
- struct brw_page *pga,
- struct obd_brw_set *set)
-{
- LBUG();
- return 0;
-}
-
-static int sanosc_brw_write(struct lustre_handle *conn,
- struct lov_stripe_md *md,
- obd_count page_count,
- struct brw_page *pga,
- struct obd_brw_set *set)
-{
- LBUG();
- return 0;
-}
-#endif
static int sanosc_brw(int cmd, struct lustre_handle *conn,
- struct lov_stripe_md *md, obd_count page_count,
+ struct lov_stripe_md *lsm, obd_count page_count,
struct brw_page *pga, struct obd_brw_set *set,
struct obd_trans_info *oti)
{
pages_per_brw = page_count;
if (cmd & OBD_BRW_WRITE)
- rc = sanosc_brw_write(conn, md, pages_per_brw,
+ rc = sanosc_brw_write(conn, lsm, pages_per_brw,
pga, set);
else
- rc = sanosc_brw_read(conn, md, pages_per_brw, pga, set);
+ rc = sanosc_brw_read(conn, lsm, pages_per_brw, pga,set);
if (rc != 0)
RETURN(rc);
RETURN(0);
}
#endif
+#endif
static int osc_enqueue(struct lustre_handle *connh, struct lov_stripe_md *lsm,
struct lustre_handle *parent_lock,
sizeof(extent), mode, lockh);
if (rc == 1)
/* We already have a lock, and it's referenced */
- RETURN(ELDLM_OK);
+ RETURN(ELDLM_LOCK_MATCHED);
/* If we're trying to read, we also search for an existing PW lock. The
* VFS and page cache already protect us locally, so lots of readers/
ldlm_lock_addref(lockh, LCK_PR);
ldlm_lock_decref(lockh, LCK_PW);
- RETURN(ELDLM_OK);
+ RETURN(ELDLM_LOCK_MATCHED);
}
}
{
struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
struct ptlrpc_bulk_desc *desc;
- struct obd_ioobj *tmp1;
- void *tmp2, *end2;
struct niobuf_remote *remote_nb;
struct niobuf_local *local_nb = NULL;
struct obd_ioobj *ioo;
struct ost_body *body;
struct l_wait_info lwi;
void *desc_priv = NULL;
+ void *end2;
int cmd, i, j, objcount, niocount, size = sizeof(*body);
int rc = 0;
#if CHECKSUM_BULK
ENTRY;
body = lustre_msg_buf(req->rq_reqmsg, 0);
- tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
- tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
- end2 = (char *)tmp2 + req->rq_reqmsg->buflens[2];
+ ioo = lustre_msg_buf(req->rq_reqmsg, 1);
+ remote_nb = lustre_msg_buf(req->rq_reqmsg, 2);
+ end2 = (char *)remote_nb + req->rq_reqmsg->buflens[2];
objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo);
niocount = req->rq_reqmsg->buflens[2] / sizeof(*remote_nb);
cmd = OBD_BRW_READ;
if (rc)
GOTO(out, req->rq_status = rc);
- for (i = 0; i < objcount; i++) {
- ost_unpack_ioo(&tmp1, &ioo);
- if (tmp2 + ioo->ioo_bufcnt > end2) {
+ for (i = 0; i < objcount; i++, ioo++) {
+ ost_unpack_ioo(ioo, ioo);
+ if ((void *)(remote_nb + ioo->ioo_bufcnt) > end2) {
+ CERROR("BRW: objid "LPX64" count %u larger than %u\n",
+ ioo->ioo_id, ioo->ioo_bufcnt,
+ (int)(end2 - (void *)remote_nb));
LBUG();
- GOTO(out, rc = -EFAULT);
+ GOTO(out, rc = -EINVAL);
}
- for (j = 0; j < ioo->ioo_bufcnt; j++) {
- /* XXX verify niobuf[j].offset > niobuf[j-1].offset */
- ost_unpack_niobuf(&tmp2, &remote_nb);
+ for (j = 0; j < ioo->ioo_bufcnt; j++, remote_nb++) {
+ ost_unpack_niobuf(remote_nb, remote_nb);
+ if (remote_nb->len == 0) {
+ CERROR("zero len BRW: objid "LPX64" buf %u\n",
+ ioo->ioo_id, j);
+ GOTO(out, rc = -EINVAL);
+ }
+ if (j && remote_nb->offset <= (remote_nb - 1)->offset) {
+ CERROR("unordered BRW: objid "LPX64
+ " buf %u offset "LPX64" <= "LPX64"\n",
+ ioo->ioo_id, j, remote_nb->offset,
+ (remote_nb - 1)->offset);
+ GOTO(out, rc = -EINVAL);
+ }
}
}
if (local_nb == NULL)
GOTO(out, rc = -ENOMEM);
- /* The unpackers move tmp1 and tmp2, so reset them before using */
+ /* The unpackers move ioo and remote_nb, so reset them before using */
ioo = lustre_msg_buf(req->rq_reqmsg, 1);
remote_nb = lustre_msg_buf(req->rq_reqmsg, 2);
req->rq_status = obd_preprw(cmd, conn, objcount, ioo, niocount,
{
struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
struct ptlrpc_bulk_desc *desc;
- struct obd_ioobj *tmp1;
- void *tmp2, *end2;
struct niobuf_remote *remote_nb;
+ void *end2;
struct niobuf_local *local_nb = NULL;
struct obd_ioobj *ioo;
struct ost_body *body;
ENTRY;
body = lustre_msg_buf(req->rq_reqmsg, 0);
- tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
- tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
- end2 = (char *)tmp2 + req->rq_reqmsg->buflens[2];
+ ioo = lustre_msg_buf(req->rq_reqmsg, 1);
+ remote_nb = lustre_msg_buf(req->rq_reqmsg, 2);
+ end2 = (void *)remote_nb + req->rq_reqmsg->buflens[2];
objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo);
niocount = req->rq_reqmsg->buflens[2] / sizeof(*remote_nb);
cmd = OBD_BRW_WRITE;
if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_WRITE_BULK))
GOTO(out, req->rq_status = -EIO);
- for (i = 0; i < objcount; i++) {
- ost_unpack_ioo(&tmp1, &ioo);
- if (tmp2 + ioo->ioo_bufcnt > end2) {
+ for (i = 0; i < objcount; i++, ioo++) {
+ ost_unpack_ioo(ioo, ioo);
+ if ((void *)(remote_nb + ioo->ioo_bufcnt) > end2) {
+ CERROR("BRW: objid "LPX64" count %u larger than %u\n",
+ ioo->ioo_id, ioo->ioo_bufcnt,
+ (int)(end2 - (void *)remote_nb));
LBUG();
- GOTO(out, rc = -EFAULT);
+ GOTO(out, rc = -EINVAL);
}
- for (j = 0; j < ioo->ioo_bufcnt; j++) {
- /* XXX verify niobuf[j].offset > niobuf[j-1].offset */
- ost_unpack_niobuf(&tmp2, &remote_nb);
+ for (j = 0; j < ioo->ioo_bufcnt; j++, remote_nb++) {
+ ost_unpack_niobuf(remote_nb, remote_nb);
+ if (remote_nb->len == 0) {
+ CERROR("zero len BRW: objid "LPX64" buf %u\n",
+ ioo->ioo_id, j);
+ GOTO(out, rc = -EINVAL);
+ }
+ if (j && remote_nb->offset <= (remote_nb - 1)->offset) {
+ CERROR("unordered BRW: objid "LPX64
+ " buf %u offset "LPX64" <= "LPX64"\n",
+ ioo->ioo_id, j, remote_nb->offset,
+ (remote_nb - 1)->offset);
+ GOTO(out, rc = -EINVAL);
+ }
}
}
if (local_nb == NULL)
GOTO(out, rc = -ENOMEM);
- /* The unpackers move tmp1 and tmp2, so reset them before using */
+ /* The unpackers move ioo and remote_nb, so reset them before using */
ioo = lustre_msg_buf(req->rq_reqmsg, 1);
remote_nb = lustre_msg_buf(req->rq_reqmsg, 2);
+
req->rq_status = obd_preprw(cmd, conn, objcount, ioo, niocount,
remote_nb, local_nb, &desc_priv, oti);
struct obd_ioobj *ioo;
struct ost_body *body;
int cmd, rc, i, j, objcount, niocount, size[2] = {sizeof(*body)};
- void *tmp1, *tmp2, *end2;
+ void *end2;
ENTRY;
body = lustre_msg_buf(req->rq_reqmsg, 0);
- tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
- tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
- end2 = (char *)tmp2 + req->rq_reqmsg->buflens[2];
+ ioo = lustre_msg_buf(req->rq_reqmsg, 1);
+ remote_nb = lustre_msg_buf(req->rq_reqmsg, 2);
+ end2 = (void *)remote_nb + req->rq_reqmsg->buflens[2];
objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo);
niocount = req->rq_reqmsg->buflens[2] / sizeof(*remote_nb);
-
+
cmd = alloc ? OBD_BRW_WRITE : OBD_BRW_READ;
- for (i = 0; i < objcount; i++) {
- ost_unpack_ioo((void *)&tmp1, &ioo);
- if (tmp2 + ioo->ioo_bufcnt > end2) {
- rc = -EFAULT;
- break;
+ for (i = 0; i < objcount; i++, ioo++) {
+ ost_unpack_ioo(ioo, ioo);
+ if ((void *)(remote_nb + ioo->ioo_bufcnt) > end2) {
+ CERROR("BRW: objid "LPX64" count %u larger than %u\n",
+ ioo->ioo_id, ioo->ioo_bufcnt,
+ (int)(end2 - (void *)remote_nb));
+ GOTO(out, rc = -EINVAL);
}
- for (j = 0; j < ioo->ioo_bufcnt; j++)
- ost_unpack_niobuf((void *)&tmp2, &remote_nb);
+ for (j = 0; j < ioo->ioo_bufcnt; j++, remote_nb++)
+ ost_unpack_niobuf(remote_nb, remote_nb);
}
size[1] = niocount * sizeof(*remote_nb);
if (rc)
GOTO(out, rc);
- /* The unpackers move tmp1 and tmp2, so reset them before using */
- tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
- tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
+ /* The unpackers move ioo and remote_nb, so reset them before using */
+ ioo = lustre_msg_buf(req->rq_reqmsg, 1);
+ remote_nb = lustre_msg_buf(req->rq_reqmsg, 2);
- req->rq_status = obd_san_preprw(cmd, conn, objcount, tmp1,
- niocount, tmp2);
+ req->rq_status = obd_san_preprw(cmd, conn, objcount, ioo,
+ niocount, remote_nb);
if (req->rq_status) {
rc = 0;
remote_nb = lustre_msg_buf(req->rq_repmsg, 1);
res_nb = lustre_msg_buf(req->rq_reqmsg, 2);
- for (i = 0; i < niocount; i++) {
- /* this advances remote_nb */
- ost_pack_niobuf((void **)&remote_nb,
- res_nb[i].offset,
- res_nb[i].len, /* 0 */
- res_nb[i].flags, /* 0 */
- res_nb[i].xid
- );
- }
+ for (i = 0; i < niocount; i++, remote_nb++, res_nb++)
+ ost_pack_niobuf(remote_nb, res_nb->offset, res_nb->len,
+ res_nb->flags, res_nb->xid);
rc = 0;
spin_unlock_irq(&io_request_lock);
/* XXX dunno if we're supposed to get this or not.. */
+ /* __make_request() changes READA to READ - Kris */
LASSERT(req->cmd != READA);
if ( req->cmd == READ )
else
cmd = PTLBD_WRITE;
- ptlbd_send_req(ptlbd, cmd, req->bh);
+ ptlbd_send_req(ptlbd, cmd, req);
spin_lock_irq(&io_request_lock);
for ( i = 0 ; i < PTLBD_MAX_MINOR ; i++) {
ptlbd_size_size[i] = 4096;
- ptlbd_size[i] = (4096*2048) >> BLOCK_SIZE_BITS;
+ /* avoid integer overflow */
+ ptlbd_size[i] = (16*1024*((1024*1024) >> BLOCK_SIZE_BITS));
ptlbd_hardsect_size[i] = 4096;
ptlbd_max_sectors[i] = 2;
//RHism ptlbd_dev_varyio[i] = 0;
void ptlbd_blk_exit(void)
{
- int ret;
ENTRY;
blk_cleanup_queue(BLK_DEFAULT_QUEUE(PTLBD_MAJOR));
- ret = unregister_blkdev(PTLBD_MAJOR, "ptlbd");
- if ( ret ) /* XXX */
- printk("unregister_blkdev() failed: %d\n", ret);
+ unregister_blkdev(PTLBD_MAJOR, "ptlbd");
}
#undef MAJOR_NR
static int ptlbd_cl_cleanup(struct obd_device *obddev)
{
-// struct ptlbd_obd *ptlbd = &obddev->u.ptlbd;
+ struct ptlbd_obd *ptlbd = &obddev->u.ptlbd;
ENTRY;
- CERROR("I should be cleaning things up\n");
+ if (!ptlbd)
+ RETURN(-ENOENT);
+
+ if (!ptlbd->bd_import.imp_connection)
+ RETURN(-ENOENT);
+
+ ptlrpc_cleanup_client(&ptlbd->bd_import);
+ ptlrpc_put_connection(ptlbd->bd_import.imp_connection);
RETURN(0);
}
ENTRY;
ptlbd_cl_exit();
ptlbd_sv_exit();
+ ptlbd_blk_exit();
EXIT;
}
#include <linux/lprocfs_status.h>
#include <linux/obd_ptlbd.h>
+#define RSP_OK 0
+#define RSP_NOTOK -1
+#define RQ_OK 0
+
int ptlbd_send_req(struct ptlbd_obd *ptlbd, ptlbd_cmd_t cmd,
- struct buffer_head *first_bh)
+ struct request *blkreq)
{
+ struct buffer_head *first_bh = blkreq->bh;
struct obd_import *imp = &ptlbd->bd_import;
struct ptlbd_op *op;
struct ptlbd_niob *niob, *niobs;
req->rq_level = imp->imp_level;
rc = ptlrpc_queue_wait(req);
- if ( rc == 0 ) {
- rsp = lustre_msg_buf(req->rq_repmsg, 0);
- /* XXX do stuff */
+ if ( rc != 0 ) {
+ blkreq->errors++;
+ GOTO(out_desc, rc);
+ }
+ rsp = lustre_msg_buf(req->rq_repmsg, 0);
+ if (rsp->r_status != RSP_OK) {
+ blkreq->errors += rsp->r_error_cnt;
}
out_desc:
RETURN(1);
}
-void ptlbd_do_filp(struct file *filp, int op, struct ptlbd_niob *niobs,
+int ptlbd_do_filp(struct file *filp, int op, struct ptlbd_niob *niobs,
int page_count, struct list_head *page_list)
{
mm_segment_t old_fs;
struct list_head *pos;
+ int status = RSP_OK;
ENTRY;
old_fs = get_fs();
struct page *page = list_entry(pos, struct page, list);
loff_t offset = (niobs->n_block_nr << PAGE_SHIFT) +
niobs->n_offset;
-
- if ( op == PTLBD_READ )
- ret = filp->f_op->read(filp, page_address(page),
- niobs->n_length, &offset);
- else
- ret = filp->f_op->write(filp, page_address(page),
- niobs->n_length, &offset);
+ if ( op == PTLBD_READ ) {
+ if ((ret = filp->f_op->read(filp, page_address(page),
+ niobs->n_length, &offset)) != niobs->n_length)
+ status = ret;
+ goto out;
+ } else {
+ if ((ret = filp->f_op->write(filp, page_address(page),
+ niobs->n_length, &offset)) != niobs->n_length)
+ status = ret;
+ goto out;
+ }
niobs++;
}
-
+out:
set_fs(old_fs);
- EXIT;
+ RETURN(status);
}
int ptlbd_parse_req(struct ptlrpc_request *req)
struct ptlrpc_bulk_desc *desc;
struct file *filp = req->rq_obd->u.ptlbd.filp;
struct l_wait_info lwi;
- int size[1], wait_flag, i, page_count, rc;
+ int size[1], wait_flag, i, page_count, rc, error_cnt = 0,
+ status = RSP_OK;
struct list_head *pos, *n;
LIST_HEAD(tmp_pages);
ENTRY;
GOTO(out_bulk, rc = -ENOMEM);
list_add(&bulk->bp_page->list, &tmp_pages);
- /*
- * XXX what about the block number?
- */
bulk->bp_xid = niob->n_xid;
bulk->bp_buf = page_address(bulk->bp_page);
bulk->bp_buflen = niob->n_length;
}
if ( op->op_cmd == PTLBD_READ ) {
- ptlbd_do_filp(filp, PTLBD_READ, niobs, page_count, &tmp_pages);
+ if ((status = ptlbd_do_filp(filp, PTLBD_READ, niobs,
+ page_count, &tmp_pages)) < 0) {
+ error_cnt++;
+ }
rc = ptlrpc_bulk_put(desc);
wait_flag = PTL_BULK_FL_SENT;
} else {
if ( rsp == NULL )
GOTO(out, rc = -EINVAL);
- ptlbd_do_filp(filp, PTLBD_WRITE, niobs, page_count, &tmp_pages);
+ if ( op->op_cmd == PTLBD_WRITE ) {
+ if ((status = ptlbd_do_filp(filp, PTLBD_WRITE, niobs,
+ page_count, &tmp_pages)) < 0) {
+ error_cnt++;
+ }
+ }
- rsp->r_error_cnt = 42;
- rsp->r_status = 69;
+ rsp->r_error_cnt = error_cnt;
+ rsp->r_status = status; /* I/O status */
+ req->rq_status = RQ_OK ; /* XXX */ /* ptlbd req status */
- req->rq_status = 0; /* XXX */
ptlrpc_reply(req->rq_svc, req);
out_bulk:
#include <linux/lprocfs_status.h>
#include <linux/obd_ptlbd.h>
+#define BACKING_FILE "/tmp/ptlbd-backing-file-la-la-la"
+
static int ptlbd_sv_already_setup = 1;
static int ptlbd_sv_setup(struct obd_device *obddev, obd_count len, void *buf)
int rc;
ENTRY;
- ptlbd->filp = filp_open("/tmp/ptlbd-backing-file-la-la-la",
- O_RDWR|O_CREAT, 0600);
+ ptlbd->filp = filp_open(BACKING_FILE,
+ O_RDWR|O_CREAT|O_LARGEFILE, 0600);
+
if ( IS_ERR(ptlbd->filp) )
RETURN(PTR_ERR(ptlbd->filp));
int rc = 0;
ENTRY;
+ obd_brw_set_addref(set);
switch(phase) {
case CB_PHASE_START:
lwi = LWI_TIMEOUT_INTR(obd_timeout * HZ, ll_sync_brw_timeout,
ll_sync_brw_intr, set);
rc = l_wait_event(set->brw_waitq,
- atomic_read(&set->brw_refcount) == 0, &lwi);
+ atomic_read(&set->brw_desc_count) == 0, &lwi);
list_for_each_safe(tmp, next, &set->brw_desc_head) {
struct ptlrpc_bulk_desc *desc =
}
break;
case CB_PHASE_FINISH:
- if (atomic_dec_and_test(&set->brw_refcount))
+ if (atomic_dec_and_test(&set->brw_desc_count))
wake_up(&set->brw_waitq);
break;
default:
LBUG();
}
+ obd_brw_set_decref(set);
RETURN(rc);
}
RETURN(NULL);
}
+ request->rq_timeout = obd_timeout;
request->rq_level = LUSTRE_CONN_FULL;
request->rq_type = PTL_RPC_MSG_REQUEST;
request->rq_import = imp;
interrupted_request, req);
} else {
DEBUG_REQ(D_NET, req, "-- sleeping");
- lwi = LWI_TIMEOUT_INTR(obd_timeout * HZ, expired_request,
+ lwi = LWI_TIMEOUT_INTR(req->rq_timeout * HZ, expired_request,
interrupted_request, req);
}
#ifdef __KERNEL__
}
imp->imp_level = LUSTRE_CONN_RECOVD;
spin_unlock_irqrestore(&imp->imp_lock, flags);
- rc = imp->imp_recover(imp, PTLRPC_RECOVD_PHASE_NOTCONN);
- if (rc)
- LBUG();
+ if (imp->imp_recover != NULL) {
+ rc = imp->imp_recover(imp, PTLRPC_RECOVD_PHASE_NOTCONN);
+ if (rc)
+ LBUG();
+ }
GOTO(out, rc = -EIO);
}
{
unsigned long flags;
struct list_head *tmp, *n;
+ ENTRY;
/* Make sure that no new requests get processed for this import.
* ptlrpc_queue_wait must (and does) hold imp_lock while testing this
req->rq_import = NULL;
wake_up(&req->rq_wait_for_rep);
}
+ EXIT;
}
int ptlrpc_abort_bulk(struct ptlrpc_bulk_desc *desc)
{
+ int rc1, rc2;
/* This should be safe: these handles are initialized to be
* invalid in ptlrpc_prep_bulk() */
- PtlMDUnlink(desc->bd_md_h);
- PtlMEUnlink(desc->bd_me_h);
+ rc1 = PtlMDUnlink(desc->bd_md_h);
+ if (rc1 != PTL_OK)
+ CERROR("PtlMDUnlink: %d\n", rc1);
+ rc2 = PtlMEUnlink(desc->bd_me_h);
+ if (rc2 != PTL_OK)
+ CERROR("PtlMEUnlink: %d\n", rc2);
+
+ return rc1 ? rc1 : rc2;
+}
- return 0;
+void obd_brw_set_addref(struct obd_brw_set *set)
+{
+ atomic_inc(&set->brw_refcount);
}
void obd_brw_set_add(struct obd_brw_set *set, struct ptlrpc_bulk_desc *desc)
LASSERT(list_empty(&desc->bd_set_chain));
ptlrpc_bulk_addref(desc);
- atomic_inc(&set->brw_refcount);
+ atomic_inc(&set->brw_desc_count);
desc->bd_brw_set = set;
list_add(&desc->bd_set_chain, &set->brw_desc_head);
}
void obd_brw_set_del(struct ptlrpc_bulk_desc *desc)
{
- atomic_dec(&desc->bd_brw_set->brw_refcount);
+ atomic_dec(&desc->bd_brw_set->brw_desc_count);
list_del_init(&desc->bd_set_chain);
ptlrpc_bulk_decref(desc);
}
if (set != NULL) {
init_waitqueue_head(&set->brw_waitq);
INIT_LIST_HEAD(&set->brw_desc_head);
- atomic_set(&set->brw_refcount, 0);
+ atomic_set(&set->brw_refcount, 1);
+ atomic_set(&set->brw_desc_count, 0);
}
return set;
}
-void obd_brw_set_free(struct obd_brw_set *set)
+static void obd_brw_set_free(struct obd_brw_set *set)
{
struct list_head *tmp, *next;
ENTRY;
return;
}
+void obd_brw_set_decref(struct obd_brw_set *set)
+{
+ ENTRY;
+ if (atomic_dec_and_test(&set->brw_refcount))
+ obd_brw_set_free(set);
+ EXIT;
+}
+
int ptlrpc_reply(struct ptlrpc_service *svc, struct ptlrpc_request *req)
{
if (req->rq_repmsg == NULL) {
EXPORT_SYMBOL(ptlrpc_resend_req);
EXPORT_SYMBOL(ptl_send_rpc);
EXPORT_SYMBOL(ptlrpc_link_svc_me);
-EXPORT_SYMBOL(obd_brw_set_free);
EXPORT_SYMBOL(obd_brw_set_new);
EXPORT_SYMBOL(obd_brw_set_add);
EXPORT_SYMBOL(obd_brw_set_del);
+EXPORT_SYMBOL(obd_brw_set_decref);
+EXPORT_SYMBOL(obd_brw_set_addref);
/* client.c */
EXPORT_SYMBOL(ptlrpc_init_client);
}
CDEBUG(D_RPCTRACE, "Handling RPC ni:pid:xid:nid:opc %d:%d:"LPU64":"
- LPX64":%d\n", rqbd->rqbd_srv_ni - &svc->srv_interfaces[0],
+ LPX64":%d\n", (int)(rqbd->rqbd_srv_ni - svc->srv_interfaces),
NTOH__u32(request->rq_reqmsg->status), request->rq_xid,
event->initiator.nid, NTOH__u32(request->rq_reqmsg->opc));
/* And now, loop forever on requests */
while (1) {
- wait_event(svc->srv_waitq,
- ptlrpc_check_event(svc, thread, event));
+ struct l_wait_info lwi = { 0 };
+ l_wait_event(svc->srv_waitq,
+ ptlrpc_check_event(svc, thread, event), &lwi);
if (thread->t_flags & SVC_STOPPING) {
spin_lock(&svc->srv_lock);
static void ptlrpc_stop_thread(struct ptlrpc_service *svc,
struct ptlrpc_thread *thread)
{
+ struct l_wait_info lwi = { 0 };
+
spin_lock(&svc->srv_lock);
thread->t_flags = SVC_STOPPING;
spin_unlock(&svc->srv_lock);
wake_up(&svc->srv_waitq);
- wait_event(thread->t_ctl_waitq, (thread->t_flags & SVC_STOPPED));
+ l_wait_event(thread->t_ctl_waitq, (thread->t_flags & SVC_STOPPED),
+ &lwi);
}
void ptlrpc_stop_all_threads(struct ptlrpc_service *svc)
int ptlrpc_start_thread(struct obd_device *dev, struct ptlrpc_service *svc,
char *name)
{
+ struct l_wait_info lwi = { 0 };
struct ptlrpc_svc_data d;
struct ptlrpc_thread *thread;
int rc;
OBD_FREE(thread, sizeof(*thread));
RETURN(rc);
}
- wait_event(thread->t_ctl_waitq, thread->t_flags & SVC_RUNNING);
+ l_wait_event(thread->t_ctl_waitq, thread->t_flags & SVC_RUNNING, &lwi);
RETURN(0);
}
cd $RPM_BUILD_DIR/lustre-%{version}
./configure --with-linux='%{linuxdir}' --with-portals='%{portalsdir}' --with-portalslib='%{portalslibdir}'
make
+
+%ifarch i386
cd $RPM_BUILD_DIR/lustre-%{version}-lib/lustre-%{version}
./configure --with-lib --with-portals='%{portalsdir}' --with-portalslib='%{portalslibdir}'
make
+%endif
%install
cd $RPM_BUILD_DIR/lustre-%{version}
make install prefix=$RPM_BUILD_ROOT
+%ifarch i386
cd $RPM_BUILD_DIR/lustre-%{version}-lib/lustre-%{version}
make install prefix=$RPM_BUILD_ROOT
+%endif
# Create the pristine source directory.
%files -n lustre-source
%attr(-, root, root) /usr/src/lustre-%{version}
+%ifarch i386
%files -n liblustre
%attr(-, root, root) /lib/lustre
%attr(-, root, root) /lib/lustre/liblov.a
%attr(-, root, root) /usr/sbin/lconf
%attr(-, root, root) /usr/sbin/lmc
%attr(-, root, root) /usr/sbin/llanalyze
+%endif
%files -n lustre-ldap
config=${1:-ba-echo.xml}
+LMC_REAL="${LMC:-../utils/lmc} -m $config"
LMC="save_cmd"
-LMC_REAL="../../lustre/utils/lmc -m $config"
TCPBUF=1048576
OST=${OST:-ba-ost-1}
-CLIENT=client
-
+CLIENT=`hostname`
+
UUIDLIST=${UUIDLIST:-/usr/local/admin/ba-ost/UUID.txt}
h2tcp () {
# server node
${LMC} --add net --node $OST --tcpbuf $TCPBUF --nid $OST --nettype tcp
-${LMC} --add ost --node $OST --ost ost1 --obdtype=obdecho $OST_UUID
+${LMC} --add ost --node $OST --ost ost1 --osdtype=obdecho $OST_UUID
# osc on client
${LMC} --add echo_client --node $CLIENT --ost ost1
int fd;
char *buf;
int blocks;
+ long len;
struct stat st;
int rc;
printf("directio on %s for %dx%lu blocks \n", argv[1], blocks,
st.st_blksize);
- buf = mmap(0, blocks * st.st_blksize, PROT_READ|PROT_WRITE,
- MAP_PRIVATE|MAP_ANON, 0, 0);
+ len = blocks * st.st_blksize;
+ buf = mmap(0, len, PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_ANON, 0, 0);
if (!buf) {
printf("No memory %s\n", strerror(errno));
return 1;
}
- rc = write(fd, buf, blocks * st.st_blksize);
- if (rc != blocks * st.st_blksize) {
+ memset(buf, 0xba, len);
+ rc = write(fd, buf, len);
+ if (rc != len) {
printf("Write error %s (rc = %d)\n", strerror(errno), rc);
return 1;
}
return 1;
}
- rc = read(fd, buf, blocks * st.st_blksize);
- if (rc != blocks * st.st_blksize) {
+ rc = read(fd, buf, len);
+ if (rc != len) {
printf("Read error: %s (rc = %d)\n", strerror(errno), rc);
return 1;
}
LCMD=$TMP/lkcd-cmds-`hostname`
echo "Storing LKCD module info in $LCMD"
cat /tmp/ogdb-`hostname` | while read JUNK M JUNK; do
- DIR=`dirname $M`
- DIR=`cd $PWD/../$DIR; pwd`
- MOD="$DIR/`basename $M`"
+ MOD="../$M"
MAP=`echo $MOD | sed -e 's/\.o$/.map/'`
- MODNAME=`basename $M | sed -e 's/\.o$//'`
+ MODNAME=`basename $MOD | sed -e 's/\.o$//'`
nm $MOD > $MAP
- echo namelist -a $MOD | tee -a $LCMD
- echo symtab -a $MAP $MODNAME | tee -a $LCMD
+ echo namelist -a $PWD/$MOD | tee -a $LCMD
+ echo symtab -a $PWD/$MAP $MODNAME | tee -a $LCMD
done
sh $mkconfig $config || exit 1
fi
-${LCONF} --cleanup echo.xml
+${LCONF} --cleanup $NAME.xml
pid_t ret;
ret = waitpid(0, &status, 0);
- if (ret == 0) {
+ if (ret == 0)
continue;
- }
if (ret < 0) {
fprintf(stderr, "error: %s: wait - %s\n",
argv[0], ret, err);
if (!rc)
rc = err;
-
- live_threads--;
}
+ live_threads--;
}
} else {
if (threads)
rc = errno;
break;
}
- if (ioctl(fd, LL_IOC_SETFLAGS, &ioctl_flags) < 0) {
+ if (ioctl(fd, LL_IOC_SETFLAGS, &ioctl_flags) < 0 &&
+ errno != ENOTTY) {
fprintf(stderr, "ioctl(): %s\n",
strerror(errno));
rc = errno;
--- /dev/null
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ */
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <string.h>
+#include <errno.h>
+#include <sys/types.h>
+#include <sys/wait.h>
+
+#define DEBUG 0
+
+void
+Usage_and_abort()
+{
+ fprintf(stderr, "Usage: runas -u user_id [ -g grp_id ]" \
+ " command_to_be_run \n");
+ exit(-1);
+}
+
+// Usage: runas -u user_id [ -g grp_id ] "command_to_be_run"
+// return: the return value of "command_to_be_run"
+// NOTE: returning -1 might be the return code of this program itself or
+// the "command_to_be_run"
+
+// ROOT runs "runas" for free
+// Other users run "runas" requires chmod 6755 "command_to_be_run"
+
+int
+main(int argc, char**argv)
+{
+ char command[1024];
+ char *cmd_ptr;
+ int status;
+ int c,i;
+ int gid_is_set = 0;
+ int uid_is_set = 0;
+ uid_t user_id;
+ gid_t grp_id;
+
+ if(argc == 1) {
+ Usage_and_abort();
+ }
+
+ // get UID and GID
+ while ((c = getopt (argc, argv, "u:g:h")) != -1) {
+ switch (c) {
+ case 'u':
+ user_id = (uid_t)atoi(optarg);
+ uid_is_set = 1;
+ if(!gid_is_set) {
+ grp_id = user_id;
+ }
+ break;
+
+ case 'g':
+ grp_id = (gid_t)atoi(optarg);
+ gid_is_set = 1;
+ break;
+
+ case 'h':
+ Usage_and_abort ();
+ break;
+
+ default:
+ // fprintf(stderr, "Bad parameters.\n");
+ // Usage_and_abort ();
+ }
+ }
+
+ if (!uid_is_set){
+ Usage_and_abort ();
+ }
+
+
+ if(optind == argc) {
+ fprintf(stderr, "Bad parameters.\n");
+ Usage_and_abort();
+ }
+
+
+ // assemble the command
+ cmd_ptr = command ;
+ for (i = optind; i < argc; i++)
+ cmd_ptr += sprintf(cmd_ptr, "%s ", argv[i]);
+
+
+#if DEBUG
+ system("whoami");
+#endif
+
+ // set GID
+ status = setregid(grp_id, grp_id );
+ if( status == -1) {
+ fprintf(stderr, "Cannot change grp_ID to %d, errno=%d (%s)\n",
+ grp_id, errno, strerror(errno) );
+ exit(-1);
+ }
+
+ // set UID
+ status = setreuid(user_id, user_id );
+ if(status == -1) {
+ fprintf(stderr,"Cannot change user_ID to %d, errno=%d (%s)\n",
+ user_id, errno, strerror(errno) );
+ exit(-1);
+ }
+
+#if DEBUG
+ system("whoami");
+#endif
+
+ fprintf(stdout, "running as USER(%d), Grp (%d): \"%s\" \n",
+ user_id, grp_id, command );
+
+ // run the command
+ status = system(command);
+
+ // pass the return code of command_to_be_run out of this wrapper
+ if (status == -1) {
+ fprintf(stderr, "%s: system() command failed to run\n",
+ argv[0]);
+ }
+ else{
+ status = WEXITSTATUS(status);
+ fprintf(stderr, "[%s #%d] \"%s\" returns %d (%s).\n", argv[0],
+ user_id, argv[optind], status, strerror(status));
+
+ }
+
+ return(status);
+}
+
log '== mkdir .../d7; mcreate .../d7/f2; echo foo > .../d7/f2 = test 7b'
$MCREATE $DIR/d7/f2
-log -n foo > $DIR/d7/f2
+echo -n foo > $DIR/d7/f2
[ "`cat $DIR/d7/f2`" = "foo" ] || error
$CHECKSTAT -t file -s 3 $DIR/d7/f2 || error
pass
$CLEAN
$START
+
log '== O_CREAT|O_EXCL in subdir ====================== test 23'
mkdir $DIR/d23
$TOEXCL $DIR/d23/f23
done
[ "`cat $MOUNT1/f11`" = "abcdefghijkl" ] && pass || error
-rm -f $MOUNT1/f[0-9]* $MOUNT1/lnk
+echo "test 12: file length and contents across mounts"
+dd if=$SHELL of=$MOUNT1/f12 bs=4096 count=1
+$CHECKSTAT -s 4096 $MOUNT1/f12 $MOUNT2/f12 || error
+dd if=$SHELL bs=4096 count=1 | \
+ md5sum - $MOUNT1/f12 $MOUNT2/f12 | ( \
+ read GOODSUM DASH; \
+ while read SUM FILE ; do \
+ [ $SUM == $GOODSUM ] || exit 2; \
+ done; ) || error
+
+echo "test 13: open(,O_TRUNC,), close() across mounts"
+dd if=$SHELL of=$MOUNT1/f13 bs=4096 count=1
+> $MOUNT1/f13
+$CHECKSTAT -s 0 $MOUNT1/f13 $MOUNT2/f13 || error
+
+echo "test 14: file extension while holding the fd open"
+> $MOUNT1/f14
+# ugh.
+touch $MOUNT1/f14-start
+sh -c "
+ echo -n a;
+ mv $MOUNT1/f14-start $MOUNT1/f14-going;
+ while [ -f $MOUNT1/f14-going ] ; do sleep 1; done;
+ " >> $MOUNT1/f14 &
+while [ -f $MOUNT1/f14-start ] ; do sleep 1; done;
+$CHECKSTAT -s 1 $MOUNT1/f14 $MOUNT2/f14 || error
+rm $MOUNT1/f14-going
+rm -f $MOUNT1/f[0-9]* $MOUNT1/lnk
$CLEAN
exit
#include <stdlib.h>
#include <errno.h>
#include <sys/mman.h>
+#include <sys/types.h>
+#include <sys/stat.h>
// not correctly in the headers yet!!
//#define O_DIRECT 0
#define O_DIRECT 040000 /* direct disk access hint */
#endif
-#define BLOCKSIZE 4096
#define CERROR(fmt, arg...) fprintf(stderr, fmt, ## arg)
#ifndef __u64
#define __u64 long long
long long count, last, offset;
long pg_vec, len;
long long objid = 3;
+ struct stat st;
int flags = 0;
int cmd = 0;
char *end;
usage(argv[0]);
}
}
- len = pg_vec * BLOCKSIZE;
- last = (long long)count * len;
if (argc >= 6) {
objid = strtoull(argv[5], &end, 0);
argv[0], flags & O_DIRECT ? "directio" : "i/o",
argv[1], objid, count, pg_vec);
- buf = mmap(0, len, PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_ANON, 0, 0);
- if (!buf) {
- fprintf(stderr, "%s: no buffer memory %s\n",
- argv[0], strerror(errno));
- return 2;
- }
-
fd = open(argv[1], flags | O_LARGEFILE);
if (fd == -1) {
fprintf(stderr, "%s: cannot open %s: %s\n", argv[0],
return 3;
}
+ rc = fstat(fd, &st);
+ if (rc < 0) {
+ fprintf(stderr, "%s: cannot stat %s: %s\n", argv[0],
+ argv[1], strerror(errno));
+ return 4;
+ }
+
+ len = pg_vec * st.st_blksize;
+ last = (long long)count * len;
+
+ buf = mmap(0, len, PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_ANON, 0, 0);
+ if (!buf) {
+ fprintf(stderr, "%s: no buffer memory %s\n",
+ argv[0], strerror(errno));
+ return 2;
+ }
+
for (offset = 0; offset < last && cmd & WRITE; offset += len) {
int i;
- for (i = 0; i < len; i += BLOCKSIZE)
- page_debug_setup(buf + i, BLOCKSIZE, offset + i, objid);
+ for (i = 0; i < len; i += st.st_blksize)
+ page_debug_setup(buf + i, st.st_blksize, offset + i,
+ objid);
rc = write(fd, buf, len);
- for (i = 0; i < len; i += BLOCKSIZE) {
- if (page_debug_check("write", buf + i, BLOCKSIZE,
+ for (i = 0; i < len; i += st.st_blksize) {
+ if (page_debug_check("write", buf + i, st.st_blksize,
offset + i, objid))
return 10;
}
if (rc != len) {
- fprintf(stderr, "%s: write error: %s, rc %d\n",
- argv[0], strerror(errno), rc);
+ fprintf(stderr, "%s: write error: %s, rc %d != %ld\n",
+ argv[0], strerror(errno), rc, len);
return 4;
}
}
rc = read(fd, buf, len);
if (rc != len) {
- fprintf(stderr, "%s: read error: %s, rc %d\n",
- argv[0], strerror(errno), rc);
+ fprintf(stderr, "%s: read error: %s, rc %d != %ld\n",
+ argv[0], strerror(errno), rc, len);
return 6;
}
- for (i = 0; i < len; i += BLOCKSIZE) {
- if (page_debug_check("read", buf + i, BLOCKSIZE,
+ for (i = 0; i < len; i += st.st_blksize) {
+ if (page_debug_check("read", buf + i, st.st_blksize,
offset + i, objid))
return 11;
}
{"add_uuid", jt_obd_add_uuid, 0, "associate a UUID with a nid\n"
"usage: add_uuid <uuid> <nid> <net_type>"},
{"close_uuid", jt_obd_close_uuid, 0, "disconnect a UUID\n"
- "usage: close_uuid <uuid>)"},
+ "usage: close_uuid <uuid> <net-type>)"},
{"del_uuid", jt_obd_del_uuid, 0, "delete a UUID association\n"
"usage: del_uuid <uuid>"},
{"add_route", jt_ptl_add_route, 0,
struct obd_ioctl_data data;
if (argc != 3) {
- fprintf(stderr, "usage: %s <uuid>\n", argv[0]);
+ fprintf(stderr, "usage: %s <uuid> <net-type>\n", argv[0]);
return 0;
}
}
void
-usage (char *cmdname, int help)
+usage (char *cmdname, int help)
{
char *name = strrchr (cmdname, '/');
-
+
if (name == NULL)
name = cmdname;
-
+
fprintf (help ? stdout : stderr,
"usage: %s -d device -s size -o offset [-i id][-n reps][-l] oid\n",
name);
int
exponential_modulus (int i, int base)
{
- int top = base;
- int mod = 1;
-
- for (;;) {
- if (i < top)
- return (i%mod == 0);
-
- mod = top;
- top *= base;
- }
+ int top = base;
+ int mod = 1;
+
+ for (;;) {
+ if (i < top)
+ return (i%mod == 0);
+
+ mod = top;
+ top *= base;
+ }
}
int
-main (int argc, char **argv)
+main (int argc, char **argv)
{
uint64_t bid = (((uint64_t)gethostid()) << 32) | getpid ();
int set_bid = 0;
uint64_t oid;
- int setup = 0;
+ int setup = 0;
int device = -1;
- int npeers = 0;
+ int npeers = 0;
int reps = 1;
char hostname[128];
struct obdio_conn *conn;
- struct obdio_barrier *b;
- char *end;
+ struct obdio_barrier *b;
+ char *end;
uint64_t val;
int rc;
int c;
memset (hostname, 0, sizeof (hostname));
gethostname (hostname, sizeof (hostname));
hostname[sizeof(hostname) - 1] = 0;
-
+
while ((c = getopt (argc, argv, "hsi:d:n:p:")) != -1)
switch (c) {
case 'h':
usage (argv[0], 1);
return (0);
-
+
case 'i':
bid = strtoll (optarg, &end, 0);
if (end == optarg || *end != 0) {
}
set_bid = 1;
break;
-
+
case 's':
- setup = 1;
+ setup = 1;
break;
-
+
case 'd':
device = strtol (optarg, &end, 0);
if (end == optarg || *end != 0 || device < 0) {
case 'p':
npeers = strtol (optarg, &end, 0);
- if (end == optarg || *end != 0 || npeers <= 0) {
+ if (end == optarg || *end != 0 || npeers <= 0) {
fprintf (stderr, "Can't parse npeers %s\n",
optarg);
return (1);
if ((!setup && !set_bid) ||
npeers <= 0 ||
- device < 0 ||
+ device < 0 ||
optind == argc) {
fprintf (stderr, "%s not specified\n",
(!setup && !set_bid) ? "id" :
device < 0 ? "device" : "object id");
return (1);
}
-
+
oid = strtoull (argv[optind], &end, 0);
if (end == argv[optind] || *end != 0) {
fprintf (stderr, "Can't parse object id %s\n",
argv[optind]);
return (1);
}
-
+
conn = obdio_connect (device);
if (conn == NULL)
return (1);
- b = obdio_new_barrier (oid, bid, npeers);
- if (b == NULL)
- return (1);
+ b = obdio_new_barrier (oid, bid, npeers);
+ if (b == NULL)
+ return (1);
rc = 0;
- if (setup) {
- rc = obdio_setup_barrier (conn, b);
+ if (setup) {
+ rc = obdio_setup_barrier (conn, b);
if (rc == 0)
printf ("Setup barrier: -d %d -i "LPX64" -p %d -n1 "LPX64"\n",
device, bid, npeers, oid);
- } else {
- for (c = 0; c < reps; c++) {
- rc = obdio_barrier (conn, b);
- if (rc != 0)
- break;
- if (exponential_modulus (c, 10))
- printf ("%s: Barrier %d\n", hostname, c);
- }
- }
-
- free (b);
-
+ } else {
+ for (c = 0; c < reps; c++) {
+ rc = obdio_barrier (conn, b);
+ if (rc != 0)
+ break;
+ if (exponential_modulus (c, 10))
+ printf ("%s: Barrier %d\n", hostname, c);
+ }
+ }
+
+ free (b);
+
obdio_disconnect (conn);
return (rc == 0 ? 0 : 1);
#include "obdiolib.h"
int
-obdio_test_fixed_extent (struct obdio_conn *conn,
- uint32_t myhid, uint32_t mypid,
- int reps, int locked, uint64_t oid,
+obdio_test_fixed_extent (struct obdio_conn *conn,
+ uint32_t myhid, uint32_t mypid,
+ int reps, int locked, uint64_t oid,
uint64_t offset, uint32_t size)
{
struct lustre_handle fh;
int j;
int rc;
int rc2;
-
+
rc = obdio_open (conn, oid, &fh);
if (rc != 0) {
fprintf (stderr, "Failed to open object "LPX64": %s\n",
rc = -1;
goto out_0;
}
-
+
for (i = 0; i < reps; i++) {
ibuf = (uint32_t *) buffer;
for (j = 0; j < size / (4 * sizeof (*ibuf)); j++) {
goto out_1;
}
}
-
+
rc = obdio_pwrite (conn, oid, buffer, size, offset);
if (rc != 0) {
fprintf (stderr, "Error writing "LPX64" @ "LPU64" for %u: %s\n",
rc = -1;
goto out_1;
}
-
+
memset (buffer, 0xbb, size);
-
+
rc = obdio_pread (conn, oid, buffer, size, offset);
if (rc != 0) {
fprintf (stderr, "Error reading "LPX64" @ "LPU64" for %u: %s\n",
goto out_1;
}
}
-
+
ibuf = (uint32_t *) buffer;
for (j = 0; j < size / (4 * sizeof (*ibuf)); j++) {
if (ibuf[0] != myhid ||
}
void
-usage (char *cmdname, int help)
+usage (char *cmdname, int help)
{
char *name = strrchr (cmdname, '/');
-
+
if (name == NULL)
name = cmdname;
-
+
fprintf (help ? stdout : stderr,
"usage: %s -d device -s size -o offset [-i id][-n reps][-l] oid\n",
name);
}
int
-main (int argc, char **argv)
+main (int argc, char **argv)
{
uint32_t mypid = getpid ();
uint32_t myhid = gethostid ();
case 'h':
usage (argv[0], 1);
return (0);
-
+
case 'i':
switch (sscanf (optarg, "%i.%i", &v1, &v2)) {
case 1:
return (1);
}
break;
-
+
case 's':
if (parse_kmg (&val, optarg) != 0) {
fprintf (stderr, "Can't parse size %s\n",
size = (uint32_t)val;
set_size++;
break;
-
+
case 'o':
if (parse_kmg (&val, optarg) != 0) {
fprintf (stderr, "Can't parse offset %s\n",
device < 0 ? "device" : "object id");
return (1);
}
-
+
oid = strtoull (argv[optind], &end, 0);
if (end == argv[optind] || *end != 0) {
fprintf (stderr, "Can't parse object id %s\n",
argv[optind]);
return (1);
}
-
+
conn = obdio_connect (device);
if (conn == NULL)
return (1);
-
- rc = obdio_test_fixed_extent (conn, myhid, mypid, reps, locked,
+
+ rc = obdio_test_fixed_extent (conn, myhid, mypid, reps, locked,
oid, base_offset, size);
-
+
obdio_disconnect (conn);
return (rc == 0 ? 0 : 1);
}
int
-obdio_ioctl (struct obdio_conn *conn, int cmd)
+obdio_ioctl (struct obdio_conn *conn, int cmd)
{
char *buf = conn->oc_buffer;
int rc;
int rc2;
-
+
rc = obd_ioctl_pack (&conn->oc_data, &buf, sizeof (conn->oc_buffer));
if (rc != 0) {
- fprintf (stderr, "obdio_ioctl: obd_ioctl_pack: %d (%s)\n",
+ fprintf (stderr, "obdio_ioctl: obd_ioctl_pack: %d (%s)\n",
rc, strerror (errno));
abort ();
}
-
+
rc = ioctl (conn->oc_fd, cmd, buf);
if (rc != 0)
return (rc);
-
+
rc2 = obd_ioctl_unpack (&conn->oc_data, buf, sizeof (conn->oc_buffer));
if (rc2 != 0) {
fprintf (stderr, "obdio_ioctl: obd_ioctl_unpack: %d (%s)\n",
rc2, strerror (errno));
abort ();
}
-
+
return (rc);
}
return (NULL);
}
memset (conn, 0, sizeof (*conn));
-
- conn->oc_fd = open ("/dev/obd", O_RDWR);
- if (conn->oc_fd < 0) {
+
+ conn->oc_fd = open ("/dev/obd", O_RDWR);
+ if (conn->oc_fd < 0) {
fprintf (stderr, "obdio_connect: Can't open /dev/obd: %s\n",
strerror (errno));
goto failed;
device, strerror (errno));
goto failed;
}
-
+
obdio_iocinit (conn);
rc = obdio_ioctl (conn, OBD_IOC_CONNECT);
if (rc != 0) {
device, strerror (errno));
goto failed;
}
-
+
conn->oc_conn_addr = conn->oc_data.ioc_addr;
conn->oc_conn_cookie = conn->oc_data.ioc_cookie;
return (conn);
-
+
failed:
free (conn);
return (NULL);
}
void
-obdio_disconnect (struct obdio_conn *conn)
+obdio_disconnect (struct obdio_conn *conn)
{
close (conn->oc_fd);
/* obdclass will automatically close on last ref */
}
int
-obdio_open (struct obdio_conn *conn, uint64_t oid, struct lustre_handle *fh)
+obdio_open (struct obdio_conn *conn, uint64_t oid, struct lustre_handle *fh)
{
int rc;
-
+
obdio_iocinit (conn);
-
+
conn->oc_data.ioc_obdo1.o_id = oid;
conn->oc_data.ioc_obdo1.o_mode = S_IFREG;
conn->oc_data.ioc_obdo1.o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLMODE;
-
+
rc = obdio_ioctl (conn, OBD_IOC_OPEN);
-
+
if (rc == 0)
memcpy (fh, obdo_handle(&conn->oc_data.ioc_obdo1), sizeof (*fh));
}
int
-obdio_close (struct obdio_conn *conn, uint64_t oid, struct lustre_handle *fh)
+obdio_close (struct obdio_conn *conn, uint64_t oid, struct lustre_handle *fh)
{
obdio_iocinit (conn);
-
+
conn->oc_data.ioc_obdo1.o_id = oid;
conn->oc_data.ioc_obdo1.o_mode = S_IFREG;
memcpy (obdo_handle (&conn->oc_data.ioc_obdo1), fh, sizeof (*fh));
- conn->oc_data.ioc_obdo1.o_valid = OBD_MD_FLID | OBD_MD_FLTYPE |
+ conn->oc_data.ioc_obdo1.o_valid = OBD_MD_FLID | OBD_MD_FLTYPE |
OBD_MD_FLMODE | OBD_MD_FLHANDLE;
-
+
return (obdio_ioctl (conn, OBD_IOC_CLOSE));
}
int
-obdio_pread (struct obdio_conn *conn, uint64_t oid,
- char *buffer, uint32_t count, uint64_t offset)
+obdio_pread (struct obdio_conn *conn, uint64_t oid,
+ char *buffer, uint32_t count, uint64_t offset)
{
obdio_iocinit (conn);
-
+
conn->oc_data.ioc_obdo1.o_id = oid;
conn->oc_data.ioc_obdo1.o_mode = S_IFREG;
conn->oc_data.ioc_obdo1.o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLMODE;
}
int
-obdio_pwrite (struct obdio_conn *conn, uint64_t oid,
- char *buffer, uint32_t count, uint64_t offset)
+obdio_pwrite (struct obdio_conn *conn, uint64_t oid,
+ char *buffer, uint32_t count, uint64_t offset)
{
obdio_iocinit (conn);
-
+
conn->oc_data.ioc_obdo1.o_id = oid;
conn->oc_data.ioc_obdo1.o_mode = S_IFREG;
conn->oc_data.ioc_obdo1.o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLMODE;
struct lustre_handle *lh)
{
int rc;
-
+
obdio_iocinit (conn);
-
+
conn->oc_data.ioc_obdo1.o_id = oid;
conn->oc_data.ioc_obdo1.o_mode = S_IFREG;
conn->oc_data.ioc_obdo1.o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLMODE;
conn->oc_data.ioc_conn1 = mode;
conn->oc_data.ioc_count = count;
conn->oc_data.ioc_offset = offset;
-
+
rc = obdio_ioctl (conn, ECHO_IOC_ENQUEUE);
-
+
if (rc == 0)
memcpy (lh, obdo_handle (&conn->oc_data.ioc_obdo1), sizeof (*lh));
-
+
return (rc);
}
memcpy (obdo_handle (&conn->oc_data.ioc_obdo1), lh, sizeof (*lh));
conn->oc_data.ioc_obdo1.o_valid = OBD_MD_FLHANDLE;
-
+
return (obdio_ioctl (conn, ECHO_IOC_CANCEL));
}
void *
-obdio_alloc_aligned_buffer (void **spacep, int size)
+obdio_alloc_aligned_buffer (void **spacep, int size)
{
int pagesize = getpagesize();
void *space = malloc (size + pagesize - 1);
-
+
*spacep = space;
if (space == NULL)
return (NULL);
-
+
return ((void *)(((unsigned long)space + pagesize - 1) & ~(pagesize - 1)));
}
struct obdio_barrier *
-obdio_new_barrier (uint64_t oid, uint64_t id, int npeers)
+obdio_new_barrier (uint64_t oid, uint64_t id, int npeers)
{
- struct obdio_barrier *b;
-
- b = (struct obdio_barrier *)malloc (sizeof (*b));
- if (b == NULL) {
- fprintf (stderr, "obdio_new_barrier "LPX64": Can't allocate\n", oid);
- return (NULL);
- }
-
- b->ob_id = id;
- b->ob_oid = oid;
- b->ob_npeers = npeers;
- b->ob_ordinal = 0;
- b->ob_count = 0;
- return (b);
+ struct obdio_barrier *b;
+
+ b = (struct obdio_barrier *)malloc (sizeof (*b));
+ if (b == NULL) {
+ fprintf (stderr, "obdio_new_barrier "LPX64": Can't allocate\n", oid);
+ return (NULL);
+ }
+
+ b->ob_id = id;
+ b->ob_oid = oid;
+ b->ob_npeers = npeers;
+ b->ob_ordinal = 0;
+ b->ob_count = 0;
+ return (b);
}
int
void *space;
struct obdio_barrier *fileb;
- if (b->ob_ordinal != 0 ||
- b->ob_count != 0) {
- fprintf (stderr, "obdio_setup_barrier: invalid parameter\n");
- abort ();
- }
-
+ if (b->ob_ordinal != 0 ||
+ b->ob_count != 0) {
+ fprintf (stderr, "obdio_setup_barrier: invalid parameter\n");
+ abort ();
+ }
+
rc = obdio_open (conn, b->ob_oid, &fh);
if (rc != 0) {
fprintf (stderr, "obdio_setup_barrier "LPX64": Failed to open object: %s\n",
b->ob_oid, strerror (errno));
return (rc);
}
-
+
fileb = (struct obdio_barrier *) obdio_alloc_aligned_buffer (&space, getpagesize ());
if (fileb == NULL) {
fprintf (stderr, "obdio_setup_barrier "LPX64": Can't allocate page buffer\n",
- b->ob_oid);
+ b->ob_oid);
rc = -1;
goto out_0;
}
-
+
memset (fileb, 0, getpagesize ());
- *fileb = *b;
-
+ *fileb = *b;
+
rc = obdio_enqueue (conn, b->ob_oid, LCK_PW, 0, getpagesize (), &lh);
if (rc != 0) {
fprintf (stderr, "obdio_setup_barrier "LPX64": Error on enqueue: %s\n",
b->ob_oid, strerror (errno));
goto out_1;
}
-
+
rc = obdio_pwrite (conn, b->ob_oid, (void *)fileb, getpagesize (), 0);
- if (rc != 0)
- fprintf (stderr, "obdio_setup_barrier "LPX64": Error on write: %s\n",
- b->ob_oid, strerror (errno));
-
- rc2 = obdio_cancel (conn, &lh);
- if (rc == 0 && rc2 != 0) {
- fprintf (stderr, "obdio_setup_barrier "LPX64": Error on cancel: %s\n",
- b->ob_oid, strerror (errno));
- rc = rc2;
- }
+ if (rc != 0)
+ fprintf (stderr, "obdio_setup_barrier "LPX64": Error on write: %s\n",
+ b->ob_oid, strerror (errno));
+
+ rc2 = obdio_cancel (conn, &lh);
+ if (rc == 0 && rc2 != 0) {
+ fprintf (stderr, "obdio_setup_barrier "LPX64": Error on cancel: %s\n",
+ b->ob_oid, strerror (errno));
+ rc = rc2;
+ }
out_1:
- free (space);
+ free (space);
out_0:
- rc2 = obdio_close (conn, b->ob_oid, &fh);
- if (rc == 0 && rc2 != 0) {
- fprintf (stderr, "obdio_setup_barrier "LPX64": Error on close: %s\n",
- b->ob_oid, strerror (errno));
- rc = rc2;
- }
-
- return (rc);
+ rc2 = obdio_close (conn, b->ob_oid, &fh);
+ if (rc == 0 && rc2 != 0) {
+ fprintf (stderr, "obdio_setup_barrier "LPX64": Error on close: %s\n",
+ b->ob_oid, strerror (errno));
+ rc = rc2;
+ }
+
+ return (rc);
}
int
obdio_barrier (struct obdio_conn *conn, struct obdio_barrier *b)
{
struct lustre_handle fh;
- struct lustre_handle lh;
- int rc;
- int rc2;
+ struct lustre_handle lh;
+ int rc;
+ int rc2;
void *space;
struct obdio_barrier *fileb;
- char *mode;
-
- rc = obdio_open (conn, b->ob_oid, &fh);
- if (rc != 0) {
- fprintf (stderr, "obdio_barrier "LPX64": Error on open: %s\n",
- b->ob_oid, strerror (errno));
- return (rc);
- }
-
+ char *mode;
+
+ rc = obdio_open (conn, b->ob_oid, &fh);
+ if (rc != 0) {
+ fprintf (stderr, "obdio_barrier "LPX64": Error on open: %s\n",
+ b->ob_oid, strerror (errno));
+ return (rc);
+ }
+
fileb = (struct obdio_barrier *) obdio_alloc_aligned_buffer (&space, getpagesize ());
- if (fileb == NULL) {
- fprintf (stderr, "obdio_barrier "LPX64": Can't allocate page buffer\n",
- b->ob_oid);
- rc = -1;
- goto out_0;
- }
+ if (fileb == NULL) {
+ fprintf (stderr, "obdio_barrier "LPX64": Can't allocate page buffer\n",
+ b->ob_oid);
+ rc = -1;
+ goto out_0;
+ }
rc = obdio_enqueue (conn, b->ob_oid, LCK_PW, 0, getpagesize (), &lh);
if (rc != 0) {
b->ob_oid, strerror (errno));
goto out_1;
}
-
- memset (fileb, 0xeb, getpagesize ());
- rc = obdio_pread (conn, b->ob_oid, (void *)fileb, getpagesize (), 0);
- if (rc != 0) {
- fprintf (stderr, "obdio_barrier "LPX64": Error on initial read: %s\n",
- b->ob_oid, strerror (errno));
- goto out_2;
- }
-
- if (fileb->ob_id != b->ob_id ||
- fileb->ob_oid != b->ob_oid ||
- fileb->ob_npeers != b->ob_npeers ||
- fileb->ob_count >= b->ob_npeers ||
- fileb->ob_ordinal != b->ob_ordinal) {
- fprintf (stderr, "obdio_barrier "LPX64": corrupt on initial read\n", b->ob_id);
- fprintf (stderr, " got ["LPX64","LPX64","LPX64","LPX64","LPX64"]\n",
- fileb->ob_id, fileb->ob_oid, fileb->ob_npeers,
- fileb->ob_ordinal, fileb->ob_count);
- fprintf (stderr, " expected ["LPX64","LPX64","LPX64","LPX64","LPX64"]\n",
- b->ob_id, b->ob_oid, b->ob_npeers,
- b->ob_ordinal, b->ob_count);
- rc = -1;
- goto out_2;
- }
-
- fileb->ob_count++;
- if (fileb->ob_count == fileb->ob_npeers) { /* I'm the last joiner */
- fileb->ob_count = 0; /* join count for next barrier */
- fileb->ob_ordinal++; /* signal all joined */
- }
-
- rc = obdio_pwrite (conn, b->ob_oid, (void *)fileb, getpagesize (), 0);
- if (rc != 0) {
- fprintf (stderr, "obdio_barrier "LPX64": Error on initial write: %s\n",
- b->ob_oid, strerror (errno));
- goto out_2;
- }
-
- mode = "PW";
- b->ob_ordinal++; /* now I wait... */
- while (fileb->ob_ordinal != b->ob_ordinal) {
-
- rc = obdio_cancel (conn, &lh);
- if (rc != 0) {
- fprintf (stderr, "obdio_barrier "LPX64": Error on %s cancel: %s\n",
- b->ob_oid, mode, strerror (errno));
- goto out_1;
- }
-
- mode = "PR";
- rc = obdio_enqueue (conn, b->ob_oid, LCK_PR, 0, getpagesize (), &lh);
- if (rc != 0) {
- fprintf (stderr, "obdio_barrier "LPX64": Error on PR enqueue: %s\n",
- b->ob_oid, strerror (errno));
- goto out_1;
- }
-
- memset (fileb, 0xeb, getpagesize ());
- rc = obdio_pread (conn, b->ob_oid, (void *)fileb, getpagesize (), 0);
- if (rc != 0) {
- fprintf (stderr, "obdio_barrier "LPX64": Error on read: %s\n",
- b->ob_oid, strerror (errno));
- goto out_2;
- }
-
- if (fileb->ob_id != b->ob_id ||
- fileb->ob_oid != b->ob_oid ||
- fileb->ob_npeers != b->ob_npeers ||
- fileb->ob_count >= b->ob_npeers ||
- (fileb->ob_ordinal != b->ob_ordinal - 1 &&
- fileb->ob_ordinal != b->ob_ordinal)) {
- fprintf (stderr, "obdio_barrier "LPX64": corrupt\n", b->ob_id);
- fprintf (stderr, " got ["LPX64","LPX64","LPX64","LPX64","LPX64"]\n",
- fileb->ob_id, fileb->ob_oid, fileb->ob_npeers,
- fileb->ob_ordinal, fileb->ob_count);
- fprintf (stderr, " expected ["LPX64","LPX64","LPX64","LPX64","LPX64"]\n",
- b->ob_id, b->ob_oid, b->ob_npeers,
- b->ob_ordinal, b->ob_count);
- rc = -1;
- goto out_2;
- }
- }
-
+
+ memset (fileb, 0xeb, getpagesize ());
+ rc = obdio_pread (conn, b->ob_oid, (void *)fileb, getpagesize (), 0);
+ if (rc != 0) {
+ fprintf (stderr, "obdio_barrier "LPX64": Error on initial read: %s\n",
+ b->ob_oid, strerror (errno));
+ goto out_2;
+ }
+
+ if (fileb->ob_id != b->ob_id ||
+ fileb->ob_oid != b->ob_oid ||
+ fileb->ob_npeers != b->ob_npeers ||
+ fileb->ob_count >= b->ob_npeers ||
+ fileb->ob_ordinal != b->ob_ordinal) {
+ fprintf (stderr, "obdio_barrier "LPX64": corrupt on initial read\n", b->ob_id);
+ fprintf (stderr, " got ["LPX64","LPX64","LPX64","LPX64","LPX64"]\n",
+ fileb->ob_id, fileb->ob_oid, fileb->ob_npeers,
+ fileb->ob_ordinal, fileb->ob_count);
+ fprintf (stderr, " expected ["LPX64","LPX64","LPX64","LPX64","LPX64"]\n",
+ b->ob_id, b->ob_oid, b->ob_npeers,
+ b->ob_ordinal, b->ob_count);
+ rc = -1;
+ goto out_2;
+ }
+
+ fileb->ob_count++;
+ if (fileb->ob_count == fileb->ob_npeers) { /* I'm the last joiner */
+ fileb->ob_count = 0; /* join count for next barrier */
+ fileb->ob_ordinal++; /* signal all joined */
+ }
+
+ rc = obdio_pwrite (conn, b->ob_oid, (void *)fileb, getpagesize (), 0);
+ if (rc != 0) {
+ fprintf (stderr, "obdio_barrier "LPX64": Error on initial write: %s\n",
+ b->ob_oid, strerror (errno));
+ goto out_2;
+ }
+
+ mode = "PW";
+ b->ob_ordinal++; /* now I wait... */
+ while (fileb->ob_ordinal != b->ob_ordinal) {
+
+ rc = obdio_cancel (conn, &lh);
+ if (rc != 0) {
+ fprintf (stderr, "obdio_barrier "LPX64": Error on %s cancel: %s\n",
+ b->ob_oid, mode, strerror (errno));
+ goto out_1;
+ }
+
+ mode = "PR";
+ rc = obdio_enqueue (conn, b->ob_oid, LCK_PR, 0, getpagesize (), &lh);
+ if (rc != 0) {
+ fprintf (stderr, "obdio_barrier "LPX64": Error on PR enqueue: %s\n",
+ b->ob_oid, strerror (errno));
+ goto out_1;
+ }
+
+ memset (fileb, 0xeb, getpagesize ());
+ rc = obdio_pread (conn, b->ob_oid, (void *)fileb, getpagesize (), 0);
+ if (rc != 0) {
+ fprintf (stderr, "obdio_barrier "LPX64": Error on read: %s\n",
+ b->ob_oid, strerror (errno));
+ goto out_2;
+ }
+
+ if (fileb->ob_id != b->ob_id ||
+ fileb->ob_oid != b->ob_oid ||
+ fileb->ob_npeers != b->ob_npeers ||
+ fileb->ob_count >= b->ob_npeers ||
+ (fileb->ob_ordinal != b->ob_ordinal - 1 &&
+ fileb->ob_ordinal != b->ob_ordinal)) {
+ fprintf (stderr, "obdio_barrier "LPX64": corrupt\n", b->ob_id);
+ fprintf (stderr, " got ["LPX64","LPX64","LPX64","LPX64","LPX64"]\n",
+ fileb->ob_id, fileb->ob_oid, fileb->ob_npeers,
+ fileb->ob_ordinal, fileb->ob_count);
+ fprintf (stderr, " expected ["LPX64","LPX64","LPX64","LPX64","LPX64"]\n",
+ b->ob_id, b->ob_oid, b->ob_npeers,
+ b->ob_ordinal, b->ob_count);
+ rc = -1;
+ goto out_2;
+ }
+ }
+
out_2:
- rc2 = obdio_cancel (conn, &lh);
- if (rc == 0 && rc2 != 0) {
- fprintf (stderr, "obdio_barrier "LPX64": Error on cancel: %s\n",
- b->ob_oid, strerror (errno));
- rc = rc2;
- }
+ rc2 = obdio_cancel (conn, &lh);
+ if (rc == 0 && rc2 != 0) {
+ fprintf (stderr, "obdio_barrier "LPX64": Error on cancel: %s\n",
+ b->ob_oid, strerror (errno));
+ rc = rc2;
+ }
out_1:
- free (space);
+ free (space);
out_0:
- rc2 = obdio_close (conn, b->ob_oid, &fh);
- if (rc == 0 && rc2 != 0) {
- fprintf (stderr, "obdio_barrier "LPX64": Error on close: %s\n",
- b->ob_oid, strerror (errno));
- rc = rc2;
- }
-
- return (rc);
+ rc2 = obdio_close (conn, b->ob_oid, &fh);
+ if (rc == 0 && rc2 != 0) {
+ fprintf (stderr, "obdio_barrier "LPX64": Error on close: %s\n",
+ b->ob_oid, strerror (errno));
+ rc = rc2;
+ }
+
+ return (rc);
}
-
+