/**
* Sequential read hints.
*/
- ci_seq_read:1;
+ ci_seq_read:1,
+ /**
+ * Do parallel (async) submission of DIO RPCs. Note DIO is still sync
+ * to userspace, only the RPCs are submitted async, then waited for at
+ * the llite layer before returning.
+ */
+ ci_parallel_dio:1;
/**
* Bypass quota check
*/
void cl_sync_io_init_notify(struct cl_sync_io *anchor, int nr,
struct cl_dio_aio *aio, cl_sync_io_end_t *end);
-int cl_sync_io_wait(const struct lu_env *env, struct cl_sync_io *anchor,
- long timeout);
+int cl_sync_io_wait(const struct lu_env *env, struct cl_sync_io *anchor,
+ long timeout);
void cl_sync_io_note(const struct lu_env *env, struct cl_sync_io *anchor,
int ioret);
+int cl_sync_io_wait_recycle(const struct lu_env *env, struct cl_sync_io *anchor,
+ long timeout, int ioret);
struct cl_dio_aio *cl_aio_alloc(struct kiocb *iocb);
void cl_aio_free(struct cl_dio_aio *aio);
static inline void cl_sync_io_init(struct cl_sync_io *anchor, int nr)
struct osc_page *ops);
int osc_flush_async_page(const struct lu_env *env, struct cl_io *io,
struct osc_page *ops);
-int osc_queue_sync_pages(const struct lu_env *env, const struct cl_io *io,
+int osc_queue_sync_pages(const struct lu_env *env, struct cl_io *io,
struct osc_object *obj, struct list_head *list,
int brw_flags);
int osc_cache_truncate_start(const struct lu_env *env, struct osc_object *obj,
struct ll_sb_info *sbi = ll_i2sbi(inode);
struct ll_file_data *fd = file->private_data;
struct range_lock range;
+ bool range_locked = false;
struct cl_io *io;
ssize_t result = 0;
int rc = 0;
+ int rc2 = 0;
unsigned int retried = 0, dio_lock = 0;
bool is_aio = false;
+ bool is_parallel_dio = false;
struct cl_dio_aio *ci_aio = NULL;
size_t per_bytes;
bool partial_io = false;
if (file->f_flags & O_DIRECT) {
if (!is_sync_kiocb(args->u.normal.via_iocb))
is_aio = true;
+
+ /* the kernel does not support AIO on pipes, and parallel DIO
+ * uses part of the AIO path, so we must not do parallel dio
+ * to pipes
+ */
+ is_parallel_dio = !iov_iter_is_pipe(args->u.normal.via_iter) &&
+ !is_aio;
+
+ if (!ll_sbi_has_parallel_dio(sbi))
+ is_parallel_dio = false;
+
ci_aio = cl_aio_alloc(args->u.normal.via_iocb);
if (!ci_aio)
GOTO(out, rc = -ENOMEM);
io->ci_aio = ci_aio;
io->ci_dio_lock = dio_lock;
io->ci_ndelay_tried = retried;
+ io->ci_parallel_dio = is_parallel_dio;
if (cl_io_rw_init(env, io, iot, *ppos, per_bytes) == 0) {
- bool range_locked = false;
-
if (file->f_flags & O_APPEND)
range_lock_init(&range, 0, LUSTRE_EOF);
else
rc = cl_io_loop(env, io);
ll_cl_remove(file, env);
- if (range_locked) {
+ if (range_locked && !is_parallel_dio) {
CDEBUG(D_VFSTRACE, "Range unlock "RL_FMT"\n",
RL_PARA(&range));
range_unlock(&lli->lli_write_tree, &range);
+ range_locked = false;
}
} else {
/* cl_io_rw_init() handled IO */
rc = io->ci_result;
}
+ /* N/B: parallel DIO may be disabled during i/o submission;
+ * if that occurs, async RPCs are resolved before we get here, and this
+ * wait call completes immediately.
+ */
+ if (is_parallel_dio) {
+ struct cl_sync_io *anchor = &io->ci_aio->cda_sync;
+
+ /* for dio, EIOCBQUEUED is an implementation detail,
+ * and we don't return it to userspace
+ */
+ if (rc == -EIOCBQUEUED)
+ rc = 0;
+
+ rc2 = cl_sync_io_wait_recycle(env, anchor, 0, 0);
+ if (rc2 < 0)
+ rc = rc2;
+
+ if (range_locked) {
+ range_unlock(&lli->lli_write_tree, &range);
+ range_locked = false;
+ }
+ }
+
/*
* In order to move forward AIO, ci_nob was increased,
* but that doesn't mean io have been finished, it just
*/
if (io->ci_nob > 0) {
if (!is_aio) {
- result += io->ci_nob;
- *ppos = io->u.ci_wr.wr.crw_pos; /* for splice */
+ if (rc2 == 0) {
+ result += io->ci_nob;
+ *ppos = io->u.ci_wr.wr.crw_pos; /* for splice */
+ } else if (rc2) {
+ result = 0;
+ }
}
count -= io->ci_nob;
#define LL_SBI_FOREIGN_SYMLINK 0x20000000 /* foreign fake-symlink support */
/* foreign fake-symlink upcall registered */
#define LL_SBI_FOREIGN_SYMLINK_UPCALL 0x40000000
+#define LL_SBI_PARALLEL_DIO 0x80000000 /* parallel (async) submission of
+ RPCs for DIO */
#define LL_SBI_FLAGS { \
"nolck", \
"checksum", \
"noencrypt", \
"foreign_symlink", \
"foreign_symlink_upcall", \
+ "parallel_dio", \
}
/* This is embedded into llite super-blocks to keep track of connect
return !!(sbi->ll_flags & LL_SBI_FOREIGN_SYMLINK);
}
+static inline bool ll_sbi_has_parallel_dio(struct ll_sb_info *sbi)
+{
+ return !!(sbi->ll_flags & LL_SBI_PARALLEL_DIO);
+}
+
void ll_ras_enter(struct file *f, loff_t pos, size_t count);
/* llite/lcommon_misc.c */
sbi->ll_flags |= LL_SBI_AGL_ENABLED;
sbi->ll_flags |= LL_SBI_FAST_READ;
sbi->ll_flags |= LL_SBI_TINY_WRITE;
+ sbi->ll_flags |= LL_SBI_PARALLEL_DIO;
ll_sbi_set_encrypt(sbi, true);
/* root squash */
}
LUSTRE_RW_ATTR(tiny_write);
+static ssize_t parallel_dio_show(struct kobject *kobj,
+ struct attribute *attr,
+ char *buf)
+{
+ struct ll_sb_info *sbi = container_of(kobj, struct ll_sb_info,
+ ll_kset.kobj);
+
+ return snprintf(buf, PAGE_SIZE, "%u\n",
+ !!(sbi->ll_flags & LL_SBI_PARALLEL_DIO));
+}
+
+static ssize_t parallel_dio_store(struct kobject *kobj,
+ struct attribute *attr,
+ const char *buffer,
+ size_t count)
+{
+ struct ll_sb_info *sbi = container_of(kobj, struct ll_sb_info,
+ ll_kset.kobj);
+ bool val;
+ int rc;
+
+ rc = kstrtobool(buffer, &val);
+ if (rc)
+ return rc;
+
+ spin_lock(&sbi->ll_lock);
+ if (val)
+ sbi->ll_flags |= LL_SBI_PARALLEL_DIO;
+ else
+ sbi->ll_flags &= ~LL_SBI_PARALLEL_DIO;
+ spin_unlock(&sbi->ll_lock);
+
+ return count;
+}
+LUSTRE_RW_ATTR(parallel_dio);
+
static ssize_t max_read_ahead_async_active_show(struct kobject *kobj,
struct attribute *attr,
char *buf)
&lustre_attr_xattr_cache.attr,
&lustre_attr_fast_read.attr,
&lustre_attr_tiny_write.attr,
+ &lustre_attr_parallel_dio.attr,
&lustre_attr_file_heat.attr,
&lustre_attr_heat_decay_percentage.attr,
&lustre_attr_heat_period_second.attr,
out:
aio->cda_bytes += tot_bytes;
- if (is_sync_kiocb(iocb)) {
- struct cl_sync_io *anchor = &aio->cda_sync;
- ssize_t rc2;
+ if (rw == WRITE)
+ vio->u.readwrite.vui_written += tot_bytes;
+ else
+ vio->u.readwrite.vui_read += tot_bytes;
- /**
- * @anchor was inited as 1 to prevent end_io to be
- * called before we add all pages for IO, so drop
- * one extra reference to make sure we could wait
- * count to be zero.
- */
- cl_sync_io_note(env, anchor, result);
+ /* If async dio submission is not allowed, we must wait here. */
+ if (is_sync_kiocb(iocb) && !io->ci_parallel_dio) {
+ ssize_t rc2;
- rc2 = cl_sync_io_wait(env, anchor, 0);
+ rc2 = cl_sync_io_wait_recycle(env, &aio->cda_sync, 0, 0);
if (result == 0 && rc2)
result = rc2;
- /**
- * One extra reference again, as if @anchor is
- * reused we assume it as 1 before using.
- */
- atomic_add(1, &anchor->csi_sync_nr);
- if (result == 0) {
- /* no commit async for direct IO */
- vio->u.readwrite.vui_written += tot_bytes;
- result = tot_bytes;
- }
- } else {
- if (rw == WRITE)
- vio->u.readwrite.vui_written += tot_bytes;
- else
- vio->u.readwrite.vui_read += tot_bytes;
+
if (result == 0)
- result = -EIOCBQUEUED;
+ result = tot_bytes;
+ } else if (result == 0) {
+ result = -EIOCBQUEUED;
}
return result;
* of relying on VFS, we move iov iter by ourselves.
*/
iov_iter_advance(vio->vui_iter, nob);
+ CDEBUG(D_VFSTRACE, "advancing %ld bytes\n", nob);
vio->vui_tot_count -= nob;
iov_iter_reexpand(vio->vui_iter, vio->vui_tot_count);
}
EXIT;
}
EXPORT_SYMBOL(cl_sync_io_note);
+
+
+int cl_sync_io_wait_recycle(const struct lu_env *env, struct cl_sync_io *anchor,
+ long timeout, int ioret)
+{
+ int rc = 0;
+
+ /*
+ * @anchor was inited as 1 to prevent end_io to be
+ * called before we add all pages for IO, so drop
+ * one extra reference to make sure we could wait
+ * count to be zero.
+ */
+ cl_sync_io_note(env, anchor, ioret);
+ /* Wait for completion of normal dio.
+ * This replaces the EIOCBQEUED return from the DIO/AIO
+ * path, and this is where AIO and DIO implementations
+ * split.
+ */
+ rc = cl_sync_io_wait(env, anchor, timeout);
+ /**
+ * One extra reference again, as if @anchor is
+ * reused we assume it as 1 before using.
+ */
+ atomic_add(1, &anchor->csi_sync_nr);
+
+ return rc;
+}
+EXPORT_SYMBOL(cl_sync_io_wait_recycle);
return rc;
}
-int osc_queue_sync_pages(const struct lu_env *env, const struct cl_io *io,
+int osc_queue_sync_pages(const struct lu_env *env, struct cl_io *io,
struct osc_object *obj, struct list_head *list,
int brw_flags)
{
grants += (1 << cli->cl_chunkbits) *
((page_count + ppc - 1) / ppc);
+ CDEBUG(D_CACHE, "requesting %d bytes grant\n", grants);
spin_lock(&cli->cl_loi_list_lock);
if (osc_reserve_grant(cli, grants) == 0) {
list_for_each_entry(oap, list, oap_pending_item) {
}
osc_unreserve_grant_nolock(cli, grants, 0);
ext->oe_grants = grants;
+ } else {
+ /* We cannot report ENOSPC correctly if we do parallel
+ * DIO (async RPC submission), so turn off parallel dio
+ * if there is not sufficient grant available. This
+ * makes individual RPCs synchronous.
+ */
+ io->ci_parallel_dio = false;
+ CDEBUG(D_CACHE,
+ "not enough grant available, switching to sync for this i/o\n");
}
spin_unlock(&cli->cl_loi_list_lock);
}
req->rq_bulk_read = 1;
- if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_READ_BULK))
- RETURN(-EIO);
+ if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_READ_BULK)) {
+ /* optionally use cfs_fail_val - 1 to select a specific OST on
+ * this server to fail requests.
+ */
+ char fail_ost_name[MAX_OBD_NAME];
+
+ if (cfs_fail_val > 0) {
+ snprintf(fail_ost_name, MAX_OBD_NAME, "OST%04X",
+ cfs_fail_val - 1);
+
+ if (strstr(obd_name, fail_ost_name))
+ RETURN(err_serious(-EIO));
+ } else {
+ RETURN(err_serious(-EIO));
+ }
+ }
OBD_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_BULK, cfs_fail_val > 0 ?
cfs_fail_val : (obd_timeout + 1) / 4);
struct lustre_handle lockh = {0};
__u32 *rcs;
int objcount, niocount, npages;
- int rc, i, j;
+ int rc = 0;
+ int i, j;
enum cksum_types cksum_type = OBD_CKSUM_CRC32;
bool no_reply = false, mmap;
struct tgt_thread_big_cache *tbc = req->rq_svc_thread->t_data;
req->rq_bulk_write = 1;
if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_WRITE_BULK))
- RETURN(err_serious(-EIO));
+ rc = -EIO;
if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_WRITE_BULK2))
- RETURN(err_serious(-EFAULT));
+ rc = -EFAULT;
+ if (rc < 0) {
+ /* optionally use cfs_fail_val - 1 to select a specific OST on
+ * this server to fail requests.
+ */
+ char fail_ost_name[MAX_OBD_NAME];
+
+ if (cfs_fail_val > 0) {
+ snprintf(fail_ost_name, MAX_OBD_NAME, "OST%04X",
+ cfs_fail_val - 1);
+
+ if (strstr(obd_name, fail_ost_name))
+ RETURN(err_serious(rc));
+ } else {
+ RETURN(err_serious(rc));
+ }
+ }
/* pause before transaction has been started */
CFS_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_BULK, cfs_fail_val > 0 ?
DIR=${DIR:-$MOUNT}
assert_DIR
-MAXFREE=${MAXFREE:-$((200000 * $OSTCOUNT))}
+MAXFREE=${MAXFREE:-$((300000 * $OSTCOUNT))}
[ -f $DIR/d52a/foo ] && chattr -a $DIR/d52a/foo
[ -f $DIR/d52b/foo ] && chattr -i $DIR/d52b/foo
}
run_test 398f "verify aio handles ll_direct_rw_pages errors correctly"
+# NB: To get the parallel DIO behavior in LU-13798, there must be > 1
+# stripe and i/o size must be > stripe size
+# Old style synchronous DIO waits after submitting each chunk, resulting in a
+# single RPC in flight. This test shows async DIO submission is working by
+# showing multiple RPCs in flight.
+test_398g() { # LU-13798
+ $LFS setstripe -o 0,0 -S 1M $DIR/$tfile
+
+ # We need to do some i/o first to acquire enough grant to put our RPCs
+ # in flight; otherwise a new connection may not have enough grant
+ # available
+ dd if=/dev/urandom of=$DIR/$tfile bs=8M count=1 oflag=direct ||
+ error "parallel dio failed"
+ stack_trap "rm -f $DIR/$tfile"
+
+ # Reduce RPC size to 1M to avoid combination in to larger RPCs
+ local pages_per_rpc=$($LCTL get_param osc.*-OST0000-*.max_pages_per_rpc)
+ $LCTL set_param osc.*-OST0000-*.max_pages_per_rpc=1M
+ stack_trap "$LCTL set_param -n $pages_per_rpc"
+
+ # Recreate file so it's empty
+ rm -f $DIR/$tfile
+ $LFS setstripe -o 0,0 -S 1M $DIR/$tfile
+ #Pause rpc completion to guarantee we see multiple rpcs in flight
+ #define OBD_FAIL_OST_BRW_PAUSE_BULK
+ do_facet ost1 $LCTL set_param fail_loc=0x214 fail_val=2
+ stack_trap "do_facet ost1 $LCTL set_param fail_loc=0"
+
+ # Clear rpc stats
+ $LCTL set_param osc.*.rpc_stats=c
+
+ dd if=/dev/urandom of=$DIR/$tfile bs=8M count=1 oflag=direct ||
+ error "parallel dio failed"
+ stack_trap "rm -f $DIR/$tfile"
+
+ $LCTL get_param osc.*-OST0000-*.rpc_stats
+ pct=$($LCTL get_param osc.*-OST0000-*.rpc_stats |
+ grep -A 8 'rpcs in flight' | grep -v 'rpcs in flight' |
+ grep "8:" | awk '{print $8}')
+ # We look at the "8 rpcs in flight" field, and verify A) it is present
+ # and B) it includes all RPCs. This proves we had 8 RPCs in flight,
+ # as expected for an 8M DIO to a file with 1M stripes.
+ [ $pct -eq 100 ] || error "we should see 8 RPCs in flight"
+
+ # Verify turning off parallel dio works as expected
+ # Clear rpc stats
+ $LCTL set_param osc.*.rpc_stats=c
+ $LCTL set_param llite.*.parallel_dio=0
+ stack_trap '$LCTL set_param llite.*.parallel_dio=1'
+
+ dd if=/dev/urandom of=$DIR/$tfile bs=8M count=1 oflag=direct ||
+ error "dio with parallel dio disabled failed"
+
+ # Ideally, we would see only one RPC in flight here, but there is an
+ # unavoidable race between i/o completion and RPC in flight counting,
+ # so while only 1 i/o is in flight at a time, the RPC in flight counter
+ # will sometimes exceed 1 (3 or 4 is not rare on VM testing).
+ # So instead we just verify it's always < 8.
+ $LCTL get_param osc.*-OST0000-*.rpc_stats
+ ret=$($LCTL get_param osc.*-OST0000-*.rpc_stats |
+ grep -A 8 'rpcs in flight' | grep -v 'rpcs in flight' |
+ grep '^$' -B1 | grep . | awk '{print $1}')
+ [ $ret != "8:" ] ||
+ error "we should see fewer than 8 RPCs in flight (saw $ret)"
+}
+run_test 398g "verify parallel dio async RPC submission"
+
+test_398h() { # LU-13798
+ local dio_file=$DIR/$tfile.dio
+
+ $LFS setstripe -C 2 -S 1M $DIR/$tfile $dio_file
+
+ dd if=/dev/urandom of=$DIR/$tfile bs=8M count=8 oflag=direct
+ stack_trap "rm -f $DIR/$tfile $dio_file"
+
+ dd if=$DIR/$tfile of=$dio_file bs=8M count=8 iflag=direct oflag=direct ||
+ error "parallel dio failed"
+ diff $DIR/$tfile $dio_file
+ [[ $? == 0 ]] || error "file diff after aiocp"
+}
+run_test 398h "verify correctness of read & write with i/o size >> stripe size"
+
+test_398i() { # LU-13798
+ local dio_file=$DIR/$tfile.dio
+
+ $LFS setstripe -C 2 -S 1M $DIR/$tfile $dio_file
+
+ dd if=/dev/urandom of=$DIR/$tfile bs=8M count=8 oflag=direct
+ stack_trap "rm -f $DIR/$tfile $dio_file"
+
+ #define OBD_FAIL_LLITE_PAGE_ALLOC 0x1418
+ $LCTL set_param fail_loc=0x1418
+ # make sure we don't crash and fail properly
+ dd if=$DIR/$tfile of=$dio_file bs=8M count=8 iflag=direct oflag=direct &&
+ error "parallel dio page allocation failure succeeded"
+ diff $DIR/$tfile $dio_file
+ [[ $? != 0 ]] || error "no diff after failed aiocp"
+}
+run_test 398i "verify parallel dio handles ll_direct_rw_pages errors correctly"
+
+test_398j() { # LU-13798
+ # Stripe size > RPC size but less than i/o size tests split across
+ # stripes and RPCs for individual i/o op
+ $LFS setstripe -o 0,0 -S 4M $DIR/$tfile $DIR/$tfile.2
+
+ # Reduce RPC size to 1M to guarantee split to multiple RPCs per stripe
+ local pages_per_rpc=$($LCTL get_param osc.*-OST0000-*.max_pages_per_rpc)
+ $LCTL set_param osc.*-OST0000-*.max_pages_per_rpc=1M
+ stack_trap "$LCTL set_param -n $pages_per_rpc"
+
+ dd if=/dev/urandom of=$DIR/$tfile bs=8M count=8 oflag=direct ||
+ error "parallel dio write failed"
+ stack_trap "rm -f $DIR/$tfile $DIR/$tfile.2"
+
+ dd if=$DIR/$tfile of=$DIR/$tfile.2 bs=8M count=8 iflag=direct ||
+ error "parallel dio read failed"
+ diff $DIR/$tfile $DIR/$tfile.2
+ [[ $? == 0 ]] || error "file diff after parallel dio read"
+}
+run_test 398j "test parallel dio where stripe size > rpc_size"
+
+test_398k() { # LU-13798
+ wait_delete_completed
+ wait_mds_ost_sync
+
+ # 4 stripe file; we will cause out of space on OST0
+ $LFS setstripe -o 0,1,0,1 -S 1M $DIR/$tfile
+
+ # Fill OST0 (if it's not too large)
+ ORIGFREE=$($LCTL get_param -n lov.$FSNAME-clilov-*.kbytesavail |
+ head -n1)
+ if [[ $ORIGFREE -gt $MAXFREE ]]; then
+ skip "$ORIGFREE > $MAXFREE skipping out-of-space test on OST0"
+ fi
+ $LFS setstripe -i 0 -c 1 $DIR/$tfile.1
+ dd if=/dev/zero of=$DIR/$tfile.1 bs=1024 count=$MAXFREE &&
+ error "dd should fill OST0"
+ stack_trap "rm -f $DIR/$tfile.1"
+
+ dd if=/dev/urandom of=$DIR/$tfile bs=8M count=8 oflag=direct
+ err=$?
+
+ ls -la $DIR/$tfile
+ $CHECKSTAT -t file -s 0 $DIR/$tfile ||
+ error "file is not 0 bytes in size"
+
+ # dd above should not succeed, but don't error until here so we can
+ # get debug info above
+ [[ $err != 0 ]] ||
+ error "parallel dio write with enospc succeeded"
+ stack_trap "rm -f $DIR/$tfile"
+}
+run_test 398k "test enospc on first stripe"
+
+test_398l() { # LU-13798
+ wait_delete_completed
+ wait_mds_ost_sync
+
+ # 4 stripe file; we will cause out of space on OST0
+ # Note the 1M stripe size and the > 1M i/o size mean this ENOSPC
+ # happens on the second i/o chunk we issue
+ $LFS setstripe -o 1,0,1,0 -S 1M $DIR/$tfile $DIR/$tfile.2
+
+ dd if=/dev/urandom of=$DIR/$tfile bs=8M count=2 oflag=direct
+ stack_trap "rm -f $DIR/$tfile"
+
+ # Fill OST0 (if it's not too large)
+ ORIGFREE=$($LCTL get_param -n lov.$FSNAME-clilov-*.kbytesavail |
+ head -n1)
+ if [[ $ORIGFREE -gt $MAXFREE ]]; then
+ skip "$ORIGFREE > $MAXFREE skipping out-of-space test on OST0"
+ fi
+ $LFS setstripe -i 0 -c 1 $DIR/$tfile.1
+ dd if=/dev/zero of=$DIR/$tfile.1 bs=1024 count=$MAXFREE &&
+ error "dd should fill OST0"
+ stack_trap "rm -f $DIR/$tfile.1"
+
+ dd if=$DIR/$tfile of=$DIR/$tfile.2 bs=8M count=8 oflag=direct
+ err=$?
+ stack_trap "rm -f $DIR/$tfile.2"
+
+ # Check that short write completed as expected
+ ls -la $DIR/$tfile.2
+ $CHECKSTAT -t file -s 1048576 $DIR/$tfile.2 ||
+ error "file is not 1M in size"
+
+ # dd above should not succeed, but don't error until here so we can
+ # get debug info above
+ [[ $err != 0 ]] ||
+ error "parallel dio write with enospc succeeded"
+
+ # Truncate source file to same length as output file and diff them
+ $TRUNCATE $DIR/$tfile 1048576
+ diff $DIR/$tfile $DIR/$tfile.2
+ [[ $? == 0 ]] || error "data incorrect after short write"
+}
+run_test 398l "test enospc on intermediate stripe/RPC"
+
+test_398m() { # LU-13798
+ $LFS setstripe -o 0,1,0,1 -S 1M $DIR/$tfile
+
+ lctl set_param *debug=-1 debug_mb=10000
+
+ # Set up failure on OST0, the first stripe:
+ #define OBD_FAIL_OST_BRW_WRITE_BULK 0x20e
+ #NB: Fail val is ost # + 1, because we cannot use cfs_fail_val = 0
+ # So this fail_val specifies OST0
+ do_facet ost1 $LCTL set_param fail_loc=0x20e fail_val=1
+ stack_trap "do_facet ost1 $LCTL set_param fail_loc=0"
+
+ dd if=/dev/urandom of=$DIR/$tfile bs=8M count=8 oflag=direct &&
+ error "parallel dio write with failure on first stripe succeeded"
+ stack_trap "rm -f $DIR/$tfile"
+ do_facet ost1 $LCTL set_param fail_loc=0 fail_val=0
+
+ # Place data in file for read
+ dd if=/dev/urandom of=$DIR/$tfile bs=8M count=8 oflag=direct ||
+ error "parallel dio write failed"
+
+ # Fail read on OST0, first stripe
+ #define OBD_FAIL_OST_BRW_READ_BULK 0x20f
+ do_facet ost1 $LCTL set_param fail_loc=0x20f fail_val=1
+ dd if=$DIR/$tfile of=$DIR/$tfile.2 bs=8M count=8 iflag=direct &&
+ error "parallel dio read with error on first stripe succeeded"
+ rm -f $DIR/$tfile.2
+ do_facet ost1 $LCTL set_param fail_loc=0 fail_val=0
+
+ # Switch to testing on OST1, second stripe
+ # Clear file contents, maintain striping
+ echo > $DIR/$tfile
+ # Set up failure on OST1, second stripe:
+ do_facet ost1 $LCTL set_param fail_loc=0x20e fail_val=2
+ stack_trap "do_facet ost1 $LCTL set_param fail_loc=0"
+
+ dd if=/dev/urandom of=$DIR/$tfile bs=8M count=8 oflag=direct &&
+ error "parallel dio write with failure on first stripe succeeded"
+ stack_trap "rm -f $DIR/$tfile"
+ do_facet ost1 $LCTL set_param fail_loc=0 fail_val=0
+
+ # Place data in file for read
+ dd if=/dev/urandom of=$DIR/$tfile bs=8M count=8 oflag=direct ||
+ error "parallel dio write failed"
+
+ # Fail read on OST1, second stripe
+ #define OBD_FAIL_OST_BRW_READ_BULK 0x20f
+ do_facet ost2 $LCTL set_param fail_loc=0x20f fail_val=2
+ dd if=$DIR/$tfile of=$DIR/$tfile.2 bs=8M count=8 iflag=direct &&
+ error "parallel dio read with error on first stripe succeeded"
+ rm -f $DIR/$tfile.2
+ do_facet ost2 $LCTL set_param fail_loc=0 fail_val=0
+}
+run_test 398m "test RPC failures with parallel dio"
+
+# Parallel submission of DIO should not cause problems for append, but it's
+# important to verify.
+test_398n() { # LU-13798
+ $LFS setstripe -C 2 -S 1M $DIR/$tfile
+
+ dd if=/dev/urandom of=$DIR/$tfile bs=8M count=8 ||
+ error "dd to create source file failed"
+ stack_trap "rm -f $DIR/$tfile"
+
+ dd if=$DIR/$tfile of=$DIR/$tfile.1 bs=8M count=8 oflag=direct oflag=append ||
+ error "parallel dio write with failure on second stripe succeeded"
+ stack_trap "rm -f $DIR/$tfile $DIR/$tfile.1"
+ diff $DIR/$tfile $DIR/$tfile.1
+ [[ $? == 0 ]] || error "data incorrect after append"
+
+}
+run_test 398n "test append with parallel DIO"
+
test_fake_rw() {
local read_write=$1
if [ "$read_write" = "write" ]; then