super->pccs_async_threshold = PCC_DEFAULT_ASYNC_THRESHOLD;
super->pccs_dio_attach_iosize = PCC_DEFAULT_DIO_ATTACH_IOSIZE;
super->pccs_dio_attach_threshold = PCC_DEFAULT_DIO_ATTACH_THRESHOLD;
+ super->pccs_dio_attach_threads_per_file =
+ PCC_DEFAULT_DIO_ATTACH_THREADS_PER_FILE;
super->pccs_mode = S_IRUSR;
atomic_set(&super->pccs_attaches_queued, 0);
super->pccs_maximum_queued_attaches = PCCS_DEFAULT_ATTACH_QUEUE_DEPTH;
argv[7] = "-s",
argv[8] = "", /* iosize */
argv[9] = "-d", /* use DIO */
- argv[10] = NULL,
+ argv[10] = "-t",
+ argv[11] = "", /* numthreads */
+ argv[12] = NULL,
};
+ __u32 numthreads = super->pccs_dio_attach_threads_per_file;
__u32 iosize = super->pccs_dio_attach_iosize;
/* iosize is __u32, 2^32 is 4 billion, which is 10 digits, so + 1 for nul */
#define IOSIZE_MAXLEN 11
char iosize_str[IOSIZE_MAXLEN];
+/* max number of threads is 255, so 3 digits + 1 for nul */
+#define NUMTHREADS_MAXLEN 4
+ char numthreads_str[NUMTHREADS_MAXLEN];
char fidstring[FID_LEN];
ssize_t rc = 0;
struct lu_fid *srcfid = NULL;
RETURN(-E2BIG);
}
argv[8] = iosize_str;
+ if (snprintf(numthreads_str, NUMTHREADS_MAXLEN, "%u", numthreads) >=
+ NUMTHREADS_MAXLEN) {
+ RETURN(-E2BIG);
+ }
+ argv[11] = numthreads_str;
rc = call_usermodehelper(argv[0], argv, envp, UMH_WAIT_PROC);
if (rc != 0) {
- CERROR("%s: Error copying data for attach, Lustre file: %s, PCC file: %s, Lustre mntpath %s, iosize %u, pid %d rc: %lu:%zd.\n",
+ CERROR("%s: Error copying data for attach, Lustre file: %s, PCC file: %s, Lustre mntpath %s, iosize %u, threads %d, pid %d rc: %lu:%zd.\n",
ll_i2sbi(file_inode(lu_file))->ll_fsname, fidstring,
pcc_filepath, super->pccs_lu_pathname, iosize,
- current->pid, rc, rc);
+ numthreads, current->pid, rc, rc);
}
RETURN(rc);
}
static ssize_t pcc_copy_data(struct pcc_super *super, struct file *lu_file,
struct file *pcc_file, char *pcc_pathname,
- __u64 size, bool atomic_open_locked)
+ __u64 size, bool atomic_open_locked, bool use_dio)
{
- if (pcc_pathname) {
+ if (use_dio) {
#ifndef HAVE_INODE_RWSEM
int rc;
struct dentry *dentry = file_dentry(lu_file);
char *pcc_pathname = NULL;
struct path pcc_path;
ktime_t kstart = ktime_get();
+ bool use_dio = false;
ssize_t ret;
int flags = O_WRONLY | O_LARGEFILE;
int rc;
super->pccs_dio_attach_iosize != 0) {
int pathlen;
+ use_dio = true;
+
OBD_ALLOC(pcc_pathname, PATH_MAX);
if (!pcc_pathname)
GOTO(out, rc = -ENOMEM);
}
ret = pcc_copy_data(super, lu_file, pcc_file, pcc_pathname, filesize,
- atomic_open_locked);
+ atomic_open_locked, use_dio);
if (direct)
lu_file->f_flags |= O_DIRECT;
if (ret < 0)
local file=$DIR/$tfile
local src=$file.src
local dest=$TMP/$tfile
+ local numthreads=2
+
dd if=/dev/urandom of=$src bs=1M count=32 ||
error "dd to create source file ($src) failed"
touch $dest
stack_trap "rm -f $dest" EXIT
- # Simple test of normal copy behavior
+ # Simple test of normal copy behavior, single threaded
+ # (all other tests are multithreaded, which is the main operating mode
+ # for actual copies)
local iosize=$((1024 * 1024)) # 1 MiB
ll_fid_path_copy -F $srcfid -l $MOUNT -p $dest -s $iosize ||
error "(0) fid_path_copy of $srcfid ($src) to $dest failed"
rm -f $dest
touch $dest
- # Normal copy but with DIO
- ll_fid_path_copy -F $srcfid -l $MOUNT -p $dest -s $iosize -d ||
+ # Normal copy but with DIO and multiple threads
+ ll_fid_path_copy -F $srcfid -l $MOUNT -p $dest -s $iosize -d -t $numthreads ||
error "(2) fid_path_copy of $srcfid ($src) to $dest failed"
cmp -bl $src $dest ||
error "(3) $dest corrupt after copy from $srcfid ($src)"
error "(4) dd to create source file failed"
srcfid=$($LFS getstripe -F "$src")
# DIO copy with unaligned file size
- ll_fid_path_copy -F $srcfid -l $MOUNT -p $dest -s $iosize -d ||
+ ll_fid_path_copy -F $srcfid -l $MOUNT -p $dest -s $iosize -d -t $numthreads ||
error "(5) fid_path_copy of $srcfid ($src) to $dest failed"
cmp -bl $src $dest ||
error "(6) $dest corrupt after copy from $srcfid ($src)"
dd if=/dev/urandom of=$src bs=2383K count=1 ||
error "(7) dd to create source file failed"
srcfid=$($LFS getstripe -F "$src")
- ll_fid_path_copy -F $srcfid -l $MOUNT -p $dest -s $iosize -d ||
+ ll_fid_path_copy -F $srcfid -l $MOUNT -p $dest -s $iosize -d -t $numthreads ||
error "(8) fid_path_copy of $srcfid ($src) to $dest failed"
cmp -bl $src $dest ||
error "(9) $dest corrupt after copy from $srcfid ($src)"
ll_fid_path_copy -F $srcfid -l $MOUNT -p $dest -s $iosize -d &&
error "(15) invalid fid_path_copy succeeded"
+ #Invalid thread count
+ numthreads=0
+ ll_fid_path_copy -F $srcfid -l $MOUNT -p $dest -s $iosize -d -t $numthreads &&
+ error "(16) invalid fid_path_copy succeeded"
+
+ numthreads=512
+ ll_fid_path_copy -F $srcfid -l $MOUNT -p $dest -s $iosize -d -t $numthreads &&
+ error "(17) invalid fid_path_copy succeeded"
+
rm -f $dest
touch $dest
iosize=$((1024 * 1024))
echo "non-blksize-aligned file size for PCC backend (blksize = 512)"
dd if=/dev/urandom of=$src bs=2383001 count=1 ||
- error "(16) dd to create source file failed"
+ error "(18) dd to create source file failed"
srcfid=$($LFS getstripe -F "$src")
ll_fid_path_copy -F $srcfid -l $MOUNT -p $dest -s $iosize -d ||
- error "(17) fid_path_copy of $srcfid ($src) to $dest failed"
+ error "(19) fid_path_copy of $srcfid ($src) to $dest failed"
cmp -bl $src $dest ||
- error "(18) $dest corrupt after copy from $srcfid ($src)"
+ error "(20) $dest corrupt after copy from $srcfid ($src)"
echo "Test completed successfully"
}
local dio_threshold=$(do_facet $SINGLEAGT $LCTL get_param -n llite.*.pcc_dio_attach_threshold | head -n 1)
stack_trap "do_facet $SINGLEAGT $LCTL set_param llite.*.pcc_dio_attach_threshold=$dio_threshold" EXIT
+ local dio_attach_threads=$(do_facet $SINGLEAGT $LCTL get_param -n llite.*.pcc_dio_attach_threads_per_file | head -n 1)
+ stack_trap "do_facet $SINGLEAGT $LCTL set_param llite.*.pcc_dio_attach_threads_per_file=$dio_attach_threads" EXIT
echo "Testing that invalid inputs should be clamped to [min, max] bound"
do_facet $SINGLEAGT $LCTL set_param llite.*.pcc_dio_attach_iosize_mb=300 ||
do_facet $SINGLEAGT $LCTL set_param llite.*.pcc_dio_attach_iosize_mb=0 ||
error "should be able to set attach size to 0"
+ do_facet $SINGLEAGT $LCTL set_param llite.*.pcc_dio_attach_threads_per_file=0 &&
+ error "shouldn't be able to set threads to 0"
+
+ do_facet $SINGLEAGT $LCTL set_param llite.*.pcc_dio_attach_threads_per_file=257 &&
+ error "shouldn't be able to set threads to more than 256"
+
echo "Normal testing from here on - no errors expected."
copytool setup -m "$MOUNT" -a "$HSM_ARCHIVE_NUMBER"
do_facet $SINGLEAGT $LFS pcc detach -k $file ||
error "failed to detach file $file"
check_lpcc_state $file "none"
+
+ # Test attach with 4 threads
+ do_facet $SINGLEAGT $LCTL set_param llite.*.pcc_dio_attach_threads_per_file=4 ||
+ error "failed to set threads to 4"
+
+ rm -f $file || error "failed to remove $file"
+
+ # Unaligned file size > DIO attach size (will attach with DIO)
+ dd if=/dev/urandom of=$file.src bs=20005K count=1 ||
+ error "failed to dd write to $file.src"
+
+ dd if=$file.src of=$file bs=20005K count=1 ||
+ error "failed dd to write $file"
+
+ echo "Start to RO-PCC attach/detach the file: $file"
+ do_facet $SINGLEAGT $LFS pcc attach -r -i $HSM_ARCHIVE_NUMBER $file ||
+ error "failed to attach file $file"
+ check_lpcc_state $file "readonly"
+ cmp -bl $file.src $file || error "$file corrupt after attach"
+ do_facet $SINGLEAGT $LFS pcc detach -k $file ||
+ error "failed to detach file $file"
+ check_lpcc_state $file "none"
}
run_test 49b "Basic testing of DIO attach and sanity testing of tunables"
-#include <stdio.h>
-#include <unistd.h>
#include <errno.h>
+#include <fcntl.h>
#include <getopt.h>
+#include <pthread.h>
#include <stdlib.h>
-#include <lustre/lustreapi.h>
+#include <stdio.h>
+#include <unistd.h>
#include <linux/lustre/lustre_idl.h>
+#include <lustre/lustreapi.h>
#include <sys/types.h>
#include <sys/stat.h>
-#include <fcntl.h>
+#include <sys/syscall.h>
+#include "lstddef.h"
+
void usage(void)
{
- printf("Usage: %s: [OPTION]...\n"
+ printf("Usage: %s [OPTION]...\n"
"Copy data from a file on Lustre (specified by fid) to a file specified by path\n"
"\n"
"Mandatory arguments to long options are mandatory for short options too.\n"
-" -d, --direct_io use direct I/O\n"
-" -F, --source_fid [fid] fid of source file for copy\n"
-" -h, --help display this help and exit\n"
-" -l, --lustre_dir [dir] path to a directory on Lustre\n"
-" -p, --destination_path [path] path of destination for copy\n"
-" -s, --iosize [size] do reads/writes of [size] bytes, must be 1 MiB aligned\n",
+" -d, --direct_io use direct I/O\n"
+" -F, --source_fid [fid] fid of source file for copy\n"
+" -h, --help display this help and exit\n"
+" -l, --lustre_dir [dir] path to a directory on Lustre\n"
+" -p, --destination_path [path] path of destination for copy\n"
+" -s, --iosize [size] do reads/writes of [size] bytes, must be 1 MiB aligned\n"
+" -t, --threads [num] number of threads (default: 1)\n",
"ll_fid_path_copy");
}
-static ssize_t copy_data(int src_fd, int dst_fd, size_t iosize,
- int extra_open_flags)
+struct chunk_data {
+ ssize_t* copied_total;
+ int extra_open_flags;
+ size_t iosize;
+ off_t offset;
+ off_t end_offset;
+ int src_fd;
+ int dst_fd;
+};
+
+void *copy_data_threaded(void *arg)
{
+ struct chunk_data *chunk = arg;
+ int extra_open_flags = chunk->extra_open_flags;
+ ssize_t* copied_total = chunk->copied_total;
+ off_t end_offset = chunk->end_offset;
size_t page_size = sysconf(_SC_PAGESIZE);
- size_t copied_total = 0;
- void *buf = NULL;
- off_t offset = 0;
+ size_t iosize = chunk->iosize;
+ off_t offset = chunk->offset;
+ int src_fd = chunk->src_fd;
+ int dst_fd = chunk->dst_fd;
+ void* buf = NULL;
ssize_t rc = 0;
+ long int thread = syscall(__NR_gettid);
rc = posix_memalign(&buf, page_size, iosize);
if (rc) {
- fprintf(stderr, "%s: memalign failed: rc = %ld\n",
- program_invocation_short_name, rc);
- return rc;
+ fprintf(stderr, "%s: memalign failed, thread %ld, size %lu: rc = %ld\n",
+ program_invocation_short_name, thread, iosize, rc);
+ return NULL;
}
- while (true) {
+ while (1) {
ssize_t rsz;
ssize_t wsz;
if (rsz < 0) {
rc = -errno;
fprintf(stderr,
- "%s: failed to read from source, rc = %ld\n",
- program_invocation_short_name, rc);
+ "%s: failed to read from source thread %ld, rc = %ld\n",
+ program_invocation_short_name, thread, rc);
break;
}
+
/* EOF, copy is done */
if (rsz == 0)
break;
-retry_write:
+ /* Retry writing if interrupted */
+ retry_write:
wsz = pwrite(dst_fd, buf, rsz, offset);
if (wsz < 0) {
rc = -errno;
- /*
- * Unaligned direct IO reports error code -EINVAL
- * on the local file system (such as Ext4).
- * In this case, fallback to buffered IO any way.
+
+ /* Unaligned direct IO reports error code -EINVAL on the
+ * local file system (such as Ext4). In this case,
+ * fallback to buffered IO.
*/
if (extra_open_flags & O_DIRECT && rc == -EINVAL) {
extra_open_flags &= ~O_DIRECT;
rc = fcntl(dst_fd, F_SETFL, extra_open_flags);
if (rc < 0) {
rc = -errno;
- fprintf(stderr, "%s: failed to clear O_DIRECT flag: rc = %ld",
- program_invocation_short_name,
- rc);
+ fprintf(stderr,
+ "%s: failed to clear O_DIRECT flag: rc = %ld",
+ program_invocation_short_name, rc);
break;
}
rc = 0;
break;
}
- copied_total += wsz;
+ *copied_total += wsz;
offset += wsz;
+ if (offset == end_offset)
+ break;
}
- if (rc == 0)
- rc = copied_total;
-
free(buf);
- return rc;
-
+ return NULL;
}
-static ssize_t ll_fid_path_copy(const char *lustre_path,
- const struct lu_fid *src_fid,
- const char *dst_path, size_t iosize,
- int extra_open_flags)
+static ssize_t ll_fid_path_copy(const char* lustre_path,
+ const struct lu_fid* src_fid,
+ const char* dst_path, size_t iosize,
+ int extra_open_flags, int num_threads)
{
+ struct chunk_data *thread_chunks = NULL;
+ ssize_t* copied_totals = NULL;
+ pthread_t* threads = NULL;
+ int max_threads_for_size;
struct stat stbuf;
+ off_t chunk_size;
+ off_t offset = 0;
int src_fd = -1;
int dst_fd = -1;
ssize_t rc = 0;
+ int i;
+ /* Open source file */
src_fd = llapi_open_by_fid(lustre_path, src_fid,
O_RDONLY | extra_open_flags);
if (src_fd < 0) {
rc = -errno;
fprintf(stderr,
- "%s: failed to open source %s/"DFID": rc = %ld\n",
+ "%s: failed to open source %s/" DFID ": rc = %ld\n",
program_invocation_short_name, lustre_path,
PFID(src_fid), rc);
return rc;
}
+ /* Get source file information */
rc = fstat(src_fd, &stbuf);
if (rc < 0) {
fprintf(stderr,
- "%s: failed to stat source %s/"DFID": rc = %ld\n",
+ "%s: failed to stat source %s/" DFID ": rc = %ld\n",
program_invocation_short_name, lustre_path,
PFID(src_fid), rc);
goto out;
}
+ /* Open destination file */
dst_fd = open(dst_path, O_WRONLY | O_LARGEFILE | extra_open_flags);
if (dst_fd < 0) {
rc = -errno;
- fprintf(stderr, "%s: failed to open destination %s: rc = %ld\n",
+ fprintf(stderr,
+ "%s: failed to open destination %s: rc = %ld\n",
program_invocation_short_name, dst_path, rc);
goto out;
}
- rc = copy_data(src_fd, dst_fd, iosize, extra_open_flags);
- if (rc < 0) {
- errno = -rc;
- fprintf(stderr,
- "%s: failed to copy data from %s/"DFID" to %s: rc = %ld.\n",
- program_invocation_short_name, lustre_path,
- PFID(src_fid), dst_path, rc);
- goto out;
+ /* Calculate the maximum number of threads that can be used based on
+ * file size and iosize.
+ *
+ * This assumes the cost of starting a thread is broadly similar to the
+ * cost of doing a single IO, which is reasonable because threads are
+ * cheap to spawn and read-write ops are relatively expensive.
+ *
+ * But because of synchronization/waiting overhead, etc, we do at least
+ * 2 instead of 1 IO per thread.
+ */
+ max_threads_for_size = stbuf.st_size / (iosize * 2);
+
+ if (max_threads_for_size == 0)
+ max_threads_for_size = 1;
+
+ /* Adjust the number of threads based on the file size and iosize */
+ num_threads = min(num_threads, max_threads_for_size);
+
+ /* Calculate the initial chunk_size with the adjusted num_threads */
+ chunk_size = (stbuf.st_size + num_threads - 1) / num_threads;
+
+ /* Round chunk_size to the next iosize */
+ chunk_size += iosize - (chunk_size % iosize);
+
+ /* allocate thread tracking info */
+ threads = malloc(num_threads * sizeof(pthread_t));
+ copied_totals = malloc(num_threads * sizeof(ssize_t));
+ thread_chunks = malloc(num_threads * sizeof(struct chunk_data));
+
+ for (i = 0; i < num_threads; i++) {
+ int result;
+
+ copied_totals[i] = 0;
+
+ thread_chunks[i].src_fd = src_fd;
+ thread_chunks[i].dst_fd = dst_fd;
+ thread_chunks[i].iosize = iosize;
+ thread_chunks[i].extra_open_flags = extra_open_flags;
+ thread_chunks[i].offset = offset;
+ thread_chunks[i].end_offset = offset + chunk_size;
+ thread_chunks[i].copied_total = &copied_totals[i];
+
+ result = pthread_create(&threads[i], NULL, (void* (*)(void*))copy_data_threaded,
+ &thread_chunks[i]);
+ if (result != 0) {
+ fprintf(stdout,
+ "%s: failed to create thread %d: %d\n",
+ program_invocation_short_name, i, result);
+ rc = -result;
+ goto join_threads;
+ }
+
+ offset += chunk_size;
+ }
+
+join_threads:
+ for (i = 0; i < num_threads; i++) {
+ if (pthread_join(threads[i], NULL) != 0) {
+ fprintf(stdout, "%s: failed to join thread %d\n",
+ program_invocation_short_name, i);
+ rc = -1;
+ }
+
+ /* Accumulate the total copied bytes */
+ rc += copied_totals[i];
}
if (rc != stbuf.st_size) {
- fprintf(stderr,
+ fprintf(stdout,
"%s: incomplete copy: copied %lu bytes of %lu\n",
program_invocation_short_name, rc, stbuf.st_size);
rc = -EIO;
}
+
out:
if (src_fd > 0)
close(src_fd);
if (dst_fd > 0)
close(dst_fd);
+ free(threads);
+ free(copied_totals);
+ free(thread_chunks);
+
return rc;
}
-int main(int argc, char *argv[])
-{
+#define MAX_THREADS 255
+
+int main(int argc, char* argv[]) {
static struct option options[] = {
{ .name = "direct_io", .has_arg = no_argument, .val = 'd' },
{ .name = "src_fid", .has_arg = required_argument, .val = 'F' },
{ .name = "lustre_path", .has_arg = required_argument, .val = 'l' },
{ .name = "dst_path", .has_arg = required_argument, .val = 'p' },
{ .name = "iosize", .has_arg = required_argument, .val = 's' },
+ { .name = "threads", .has_arg = required_argument, .val = 't' },
};
- char *dst_path = NULL;
- char *lustre_path = NULL;
- struct lu_fid src_fid;
- char *fidstr = NULL;
int extra_open_flags = 0;
- __u32 iosize = 0;
- ssize_t rc;
+ char* lustre_path = NULL;
+ char* dst_path = NULL;
+ struct lu_fid src_fid;
+ char* fidstr = NULL;
+ int num_threads = 1;
+ size_t iosize = 0;
+ ssize_t rc = 0;
int c;
- while ((c = getopt_long(argc, argv, "dF:hl:p:s:", options, NULL)) != -1) {
+ while ((c = getopt_long(argc, argv, "dF:hl:p:s:t:", options, NULL)) != -1) {
switch (c) {
case 'd':
extra_open_flags = O_DIRECT;
case 's':
iosize = strtol(optarg, NULL, 0);
break;
+ case 't':
+ num_threads = atoi(optarg);
+ break;
}
}
}
if (iosize % (1024 * 1024) != 0) {
- fprintf(stderr, "%s: iosize (%u) must be 1 MiB aligned\n",
+ fprintf(stderr, "%s: iosize (%lu) must be 1 MiB aligned\n",
program_invocation_short_name, iosize);
usage();
exit(EXIT_FAILURE);
}
- fprintf(stdout,
- "%s: copying from %s to %s, iosize %d, using %s i/o.\n",
- program_invocation_short_name, fidstr, dst_path, iosize,
- extra_open_flags == 0 ? "buffered" : "direct");
+ if (num_threads < 1 || num_threads > MAX_THREADS) {
+ fprintf(stderr,
+ "%s: number of threads (%d) must be > 0 and <= %u\n",
+ program_invocation_short_name, num_threads,
+ MAX_THREADS);
+ usage();
+ exit(EXIT_FAILURE);
+ }
+
+ fprintf(stdout, "Copying from %s to %s, iosize %lu, using %s I/O with %d threads\n",
+ fidstr, dst_path, iosize,
+ (extra_open_flags == 0 ? "buffered" : "direct"), num_threads);
rc = llapi_fid_parse(fidstr, &src_fid, NULL);
if (rc < 0) {
- fprintf(stderr, "%s: failed to parse fid: %s\n",
+ fprintf(stderr, "%s: Failed to parse fid: %s\n",
program_invocation_short_name, fidstr);
exit(EXIT_FAILURE);
}
-
rc = ll_fid_path_copy(lustre_path, &src_fid, dst_path, iosize,
- extra_open_flags);
+ extra_open_flags, num_threads);
if (rc < 0)
exit(EXIT_FAILURE);
- fprintf(stdout,
- "%s: successfully copied %lu bytes from %s to %s.\n",
- program_invocation_short_name, rc, fidstr, dst_path);
+ fprintf(stdout, "Successfully copied %lu bytes from %s to %s.\n",
+ rc, fidstr, dst_path);
exit(EXIT_SUCCESS);
}
+