From aa00fdfbffcd83dab66a424857ea01a6daeef3ad Mon Sep 17 00:00:00 2001 From: Patrick Farrell Date: Tue, 30 May 2023 17:48:15 -0400 Subject: [PATCH] EX-7585 pcc: parallel data copy for attach This patch parallelize the data copying work for pcc attach by using multiple threads in the ll_fid_path_copy helper. Nvidia provided performance numbers for this from their environment. This was with 4 MiB I/O size, they reported speed was similar but *slightly* lower at larger block sizes. This is probably an EXT4 limitation since Lustre speed scales with those larger sizes. (As PCC attach is a copy from Lustre to EXT4.) This is for attaching a single 2 TiB file, they also reported no performance regression for datasets with many small files. threads: 1 2 4 8 speed: 4 GiB/s 7.8 GiB/s 14.1 GiB/s 15.2 GiB/s Performance improved only very slightly past 8 threads, and 4 threads is clearly the sweet spot for performance. Signed-off-by: Patrick Farrell Change-Id: Iffbb3892cfb5b2e71afe15d03f9aec9c84975092 Reviewed-on: https://review.whamcloud.com/c/ex/lustre-release/+/51171 Tested-by: jenkins Tested-by: Maloo Reviewed-by: Qian Yingjin Reviewed-by: Andreas Dilger --- lustre/llite/lproc_llite.c | 35 ++++++ lustre/llite/pcc.c | 28 ++++- lustre/llite/pcc.h | 4 + lustre/tests/sanity-pcc.sh | 59 ++++++++-- lustre/utils/Makefile.am | 2 +- lustre/utils/ll_fid_path_copy.c | 251 ++++++++++++++++++++++++++++------------ 6 files changed, 292 insertions(+), 87 deletions(-) diff --git a/lustre/llite/lproc_llite.c b/lustre/llite/lproc_llite.c index cd28ab1..eeee6f9 100644 --- a/lustre/llite/lproc_llite.c +++ b/lustre/llite/lproc_llite.c @@ -710,6 +710,40 @@ static ssize_t pcc_dio_attach_threshold_store(struct kobject *kobj, } LUSTRE_RW_ATTR(pcc_dio_attach_threshold); +static ssize_t pcc_dio_attach_threads_per_file_show(struct kobject *kobj, + struct attribute *attr, + char *buffer) +{ + struct ll_sb_info *sbi = container_of(kobj, struct ll_sb_info, + ll_kset.kobj); + struct pcc_super *super = &sbi->ll_pcc_super; + + return sprintf(buffer, "%u\n", + super->pccs_dio_attach_threads_per_file); +} + +static ssize_t pcc_dio_attach_threads_per_file_store(struct kobject *kobj, + struct attribute *attr, + const char *buffer, size_t count) +{ + struct ll_sb_info *sbi = container_of(kobj, struct ll_sb_info, + ll_kset.kobj); + struct pcc_super *super = &sbi->ll_pcc_super; + unsigned int val; + int rc; + + rc = kstrtouint(buffer, 0, &val); + if (rc) + return rc; + + if (val < 1 || val > PCC_DIO_ATTACH_MAX_THREADS_PER_FILE) + return -EINVAL; + + super->pccs_dio_attach_threads_per_file = val; + return count; +} +LUSTRE_RW_ATTR(pcc_dio_attach_threads_per_file); + static ssize_t pcc_max_attach_thread_num_show(struct kobject *kobj, struct attribute *attr, char *buffer) @@ -2105,6 +2139,7 @@ static struct attribute *llite_attrs[] = { &lustre_attr_pcc_dio_attach_size_mb.attr, &lustre_attr_pcc_dio_attach_iosize_mb.attr, &lustre_attr_pcc_dio_attach_threshold.attr, + &lustre_attr_pcc_dio_attach_threads_per_file.attr, &lustre_attr_pcc_max_attach_thread_num.attr, &lustre_attr_pcc_mode.attr, &lustre_attr_pcc_async_affinity.attr, diff --git a/lustre/llite/pcc.c b/lustre/llite/pcc.c index 2b5716a..7997c09 100644 --- a/lustre/llite/pcc.c +++ b/lustre/llite/pcc.c @@ -132,6 +132,8 @@ int pcc_super_init(struct pcc_super *super) super->pccs_async_threshold = PCC_DEFAULT_ASYNC_THRESHOLD; super->pccs_dio_attach_iosize = PCC_DEFAULT_DIO_ATTACH_IOSIZE; super->pccs_dio_attach_threshold = PCC_DEFAULT_DIO_ATTACH_THRESHOLD; + super->pccs_dio_attach_threads_per_file = + PCC_DEFAULT_DIO_ATTACH_THREADS_PER_FILE; super->pccs_mode = S_IRUSR; atomic_set(&super->pccs_attaches_queued, 0); super->pccs_maximum_queued_attaches = PCCS_DEFAULT_ATTACH_QUEUE_DEPTH; @@ -3895,12 +3897,18 @@ static ssize_t pcc_copy_data_dio(struct pcc_super *super, struct file *lu_file, argv[7] = "-s", argv[8] = "", /* iosize */ argv[9] = "-d", /* use DIO */ - argv[10] = NULL, + argv[10] = "-t", + argv[11] = "", /* numthreads */ + argv[12] = NULL, }; + __u32 numthreads = super->pccs_dio_attach_threads_per_file; __u32 iosize = super->pccs_dio_attach_iosize; /* iosize is __u32, 2^32 is 4 billion, which is 10 digits, so + 1 for nul */ #define IOSIZE_MAXLEN 11 char iosize_str[IOSIZE_MAXLEN]; +/* max number of threads is 255, so 3 digits + 1 for nul */ +#define NUMTHREADS_MAXLEN 4 + char numthreads_str[NUMTHREADS_MAXLEN]; char fidstring[FID_LEN]; ssize_t rc = 0; struct lu_fid *srcfid = NULL; @@ -3924,22 +3932,27 @@ static ssize_t pcc_copy_data_dio(struct pcc_super *super, struct file *lu_file, RETURN(-E2BIG); } argv[8] = iosize_str; + if (snprintf(numthreads_str, NUMTHREADS_MAXLEN, "%u", numthreads) >= + NUMTHREADS_MAXLEN) { + RETURN(-E2BIG); + } + argv[11] = numthreads_str; rc = call_usermodehelper(argv[0], argv, envp, UMH_WAIT_PROC); if (rc != 0) { - CERROR("%s: Error copying data for attach, Lustre file: %s, PCC file: %s, Lustre mntpath %s, iosize %u, pid %d rc: %lu:%zd.\n", + CERROR("%s: Error copying data for attach, Lustre file: %s, PCC file: %s, Lustre mntpath %s, iosize %u, threads %d, pid %d rc: %lu:%zd.\n", ll_i2sbi(file_inode(lu_file))->ll_fsname, fidstring, pcc_filepath, super->pccs_lu_pathname, iosize, - current->pid, rc, rc); + numthreads, current->pid, rc, rc); } RETURN(rc); } static ssize_t pcc_copy_data(struct pcc_super *super, struct file *lu_file, struct file *pcc_file, char *pcc_pathname, - __u64 size, bool atomic_open_locked) + __u64 size, bool atomic_open_locked, bool use_dio) { - if (pcc_pathname) { + if (use_dio) { #ifndef HAVE_INODE_RWSEM int rc; struct dentry *dentry = file_dentry(lu_file); @@ -3975,6 +3988,7 @@ static int pcc_attach_data_archive(struct file *lu_file, char *pcc_pathname = NULL; struct path pcc_path; ktime_t kstart = ktime_get(); + bool use_dio = false; ssize_t ret; int flags = O_WRONLY | O_LARGEFILE; int rc; @@ -3991,6 +4005,8 @@ static int pcc_attach_data_archive(struct file *lu_file, super->pccs_dio_attach_iosize != 0) { int pathlen; + use_dio = true; + OBD_ALLOC(pcc_pathname, PATH_MAX); if (!pcc_pathname) GOTO(out, rc = -ENOMEM); @@ -4059,7 +4075,7 @@ static int pcc_attach_data_archive(struct file *lu_file, } ret = pcc_copy_data(super, lu_file, pcc_file, pcc_pathname, filesize, - atomic_open_locked); + atomic_open_locked, use_dio); if (direct) lu_file->f_flags |= O_DIRECT; if (ret < 0) diff --git a/lustre/llite/pcc.h b/lustre/llite/pcc.h index 0e45185..c1e3152 100644 --- a/lustre/llite/pcc.h +++ b/lustre/llite/pcc.h @@ -181,6 +181,9 @@ struct pcc_dataset { * attach is very bad for applications, so we try to be generous. */ #define PCCS_DEFAULT_ATTACH_QUEUE_DEPTH 1024 +/* how many threads to use for copying for each DIO attach operation */ +#define PCC_DEFAULT_DIO_ATTACH_THREADS_PER_FILE 4 +#define PCC_DIO_ATTACH_MAX_THREADS_PER_FILE 256 struct pcc_super { /* Protect pccs_datasets */ struct rw_semaphore pccs_rw_sem; @@ -197,6 +200,7 @@ struct pcc_super { /* Size threshold for asynchrous PCC-RO attach in background. */ __u64 pccs_async_threshold; __u32 pccs_dio_attach_iosize; + __u32 pccs_dio_attach_threads_per_file; __u64 pccs_dio_attach_threshold; bool pccs_async_affinity; umode_t pccs_mode; diff --git a/lustre/tests/sanity-pcc.sh b/lustre/tests/sanity-pcc.sh index 28a604c..b0a5042 100755 --- a/lustre/tests/sanity-pcc.sh +++ b/lustre/tests/sanity-pcc.sh @@ -3799,6 +3799,8 @@ test_49a() { local file=$DIR/$tfile local src=$file.src local dest=$TMP/$tfile + local numthreads=2 + dd if=/dev/urandom of=$src bs=1M count=32 || error "dd to create source file ($src) failed" @@ -3810,7 +3812,9 @@ test_49a() { touch $dest stack_trap "rm -f $dest" EXIT - # Simple test of normal copy behavior + # Simple test of normal copy behavior, single threaded + # (all other tests are multithreaded, which is the main operating mode + # for actual copies) local iosize=$((1024 * 1024)) # 1 MiB ll_fid_path_copy -F $srcfid -l $MOUNT -p $dest -s $iosize || error "(0) fid_path_copy of $srcfid ($src) to $dest failed" @@ -3819,8 +3823,8 @@ test_49a() { rm -f $dest touch $dest - # Normal copy but with DIO - ll_fid_path_copy -F $srcfid -l $MOUNT -p $dest -s $iosize -d || + # Normal copy but with DIO and multiple threads + ll_fid_path_copy -F $srcfid -l $MOUNT -p $dest -s $iosize -d -t $numthreads || error "(2) fid_path_copy of $srcfid ($src) to $dest failed" cmp -bl $src $dest || error "(3) $dest corrupt after copy from $srcfid ($src)" @@ -3832,7 +3836,7 @@ test_49a() { error "(4) dd to create source file failed" srcfid=$($LFS getstripe -F "$src") # DIO copy with unaligned file size - ll_fid_path_copy -F $srcfid -l $MOUNT -p $dest -s $iosize -d || + ll_fid_path_copy -F $srcfid -l $MOUNT -p $dest -s $iosize -d -t $numthreads || error "(5) fid_path_copy of $srcfid ($src) to $dest failed" cmp -bl $src $dest || error "(6) $dest corrupt after copy from $srcfid ($src)" @@ -3843,7 +3847,7 @@ test_49a() { dd if=/dev/urandom of=$src bs=2383K count=1 || error "(7) dd to create source file failed" srcfid=$($LFS getstripe -F "$src") - ll_fid_path_copy -F $srcfid -l $MOUNT -p $dest -s $iosize -d || + ll_fid_path_copy -F $srcfid -l $MOUNT -p $dest -s $iosize -d -t $numthreads || error "(8) fid_path_copy of $srcfid ($src) to $dest failed" cmp -bl $src $dest || error "(9) $dest corrupt after copy from $srcfid ($src)" @@ -3872,17 +3876,26 @@ test_49a() { ll_fid_path_copy -F $srcfid -l $MOUNT -p $dest -s $iosize -d && error "(15) invalid fid_path_copy succeeded" + #Invalid thread count + numthreads=0 + ll_fid_path_copy -F $srcfid -l $MOUNT -p $dest -s $iosize -d -t $numthreads && + error "(16) invalid fid_path_copy succeeded" + + numthreads=512 + ll_fid_path_copy -F $srcfid -l $MOUNT -p $dest -s $iosize -d -t $numthreads && + error "(17) invalid fid_path_copy succeeded" + rm -f $dest touch $dest iosize=$((1024 * 1024)) echo "non-blksize-aligned file size for PCC backend (blksize = 512)" dd if=/dev/urandom of=$src bs=2383001 count=1 || - error "(16) dd to create source file failed" + error "(18) dd to create source file failed" srcfid=$($LFS getstripe -F "$src") ll_fid_path_copy -F $srcfid -l $MOUNT -p $dest -s $iosize -d || - error "(17) fid_path_copy of $srcfid ($src) to $dest failed" + error "(19) fid_path_copy of $srcfid ($src) to $dest failed" cmp -bl $src $dest || - error "(18) $dest corrupt after copy from $srcfid ($src)" + error "(20) $dest corrupt after copy from $srcfid ($src)" echo "Test completed successfully" } @@ -3899,6 +3912,8 @@ test_49b() { local dio_threshold=$(do_facet $SINGLEAGT $LCTL get_param -n llite.*.pcc_dio_attach_threshold | head -n 1) stack_trap "do_facet $SINGLEAGT $LCTL set_param llite.*.pcc_dio_attach_threshold=$dio_threshold" EXIT + local dio_attach_threads=$(do_facet $SINGLEAGT $LCTL get_param -n llite.*.pcc_dio_attach_threads_per_file | head -n 1) + stack_trap "do_facet $SINGLEAGT $LCTL set_param llite.*.pcc_dio_attach_threads_per_file=$dio_attach_threads" EXIT echo "Testing that invalid inputs should be clamped to [min, max] bound" do_facet $SINGLEAGT $LCTL set_param llite.*.pcc_dio_attach_iosize_mb=300 || @@ -3911,6 +3926,12 @@ test_49b() { do_facet $SINGLEAGT $LCTL set_param llite.*.pcc_dio_attach_iosize_mb=0 || error "should be able to set attach size to 0" + do_facet $SINGLEAGT $LCTL set_param llite.*.pcc_dio_attach_threads_per_file=0 && + error "shouldn't be able to set threads to 0" + + do_facet $SINGLEAGT $LCTL set_param llite.*.pcc_dio_attach_threads_per_file=257 && + error "shouldn't be able to set threads to more than 256" + echo "Normal testing from here on - no errors expected." copytool setup -m "$MOUNT" -a "$HSM_ARCHIVE_NUMBER" @@ -4032,6 +4053,28 @@ test_49b() { do_facet $SINGLEAGT $LFS pcc detach -k $file || error "failed to detach file $file" check_lpcc_state $file "none" + + # Test attach with 4 threads + do_facet $SINGLEAGT $LCTL set_param llite.*.pcc_dio_attach_threads_per_file=4 || + error "failed to set threads to 4" + + rm -f $file || error "failed to remove $file" + + # Unaligned file size > DIO attach size (will attach with DIO) + dd if=/dev/urandom of=$file.src bs=20005K count=1 || + error "failed to dd write to $file.src" + + dd if=$file.src of=$file bs=20005K count=1 || + error "failed dd to write $file" + + echo "Start to RO-PCC attach/detach the file: $file" + do_facet $SINGLEAGT $LFS pcc attach -r -i $HSM_ARCHIVE_NUMBER $file || + error "failed to attach file $file" + check_lpcc_state $file "readonly" + cmp -bl $file.src $file || error "$file corrupt after attach" + do_facet $SINGLEAGT $LFS pcc detach -k $file || + error "failed to detach file $file" + check_lpcc_state $file "none" } run_test 49b "Basic testing of DIO attach and sanity testing of tunables" diff --git a/lustre/utils/Makefile.am b/lustre/utils/Makefile.am index 132275b..a9a930f 100644 --- a/lustre/utils/Makefile.am +++ b/lustre/utils/Makefile.am @@ -86,7 +86,7 @@ lustre_rsync_LDADD := $(LIBLUSTREAPI) $(PTHREAD_LIBS) lustre_rsync_DEPENDENCIES := liblustreapi.la ll_fid_path_copy_SOURCES = ll_fid_path_copy.c -ll_fid_path_copy_LDADD := liblustreapi.la +ll_fid_path_copy_LDADD := $(LIBLUSTREAPI) $(PTHREAD_LIBS) ll_fid_path_copy_DEPENDENCIES := liblustreapi.la llsom_sync_LDADD := $(LIBLUSTREAPI) diff --git a/lustre/utils/ll_fid_path_copy.c b/lustre/utils/ll_fid_path_copy.c index 73dd98f..9e868f8 100644 --- a/lustre/utils/ll_fid_path_copy.c +++ b/lustre/utils/ll_fid_path_copy.c @@ -1,46 +1,67 @@ -#include -#include #include +#include #include +#include #include -#include +#include +#include #include +#include #include #include -#include +#include +#include "lstddef.h" + void usage(void) { - printf("Usage: %s: [OPTION]...\n" + printf("Usage: %s [OPTION]...\n" "Copy data from a file on Lustre (specified by fid) to a file specified by path\n" "\n" "Mandatory arguments to long options are mandatory for short options too.\n" -" -d, --direct_io use direct I/O\n" -" -F, --source_fid [fid] fid of source file for copy\n" -" -h, --help display this help and exit\n" -" -l, --lustre_dir [dir] path to a directory on Lustre\n" -" -p, --destination_path [path] path of destination for copy\n" -" -s, --iosize [size] do reads/writes of [size] bytes, must be 1 MiB aligned\n", +" -d, --direct_io use direct I/O\n" +" -F, --source_fid [fid] fid of source file for copy\n" +" -h, --help display this help and exit\n" +" -l, --lustre_dir [dir] path to a directory on Lustre\n" +" -p, --destination_path [path] path of destination for copy\n" +" -s, --iosize [size] do reads/writes of [size] bytes, must be 1 MiB aligned\n" +" -t, --threads [num] number of threads (default: 1)\n", "ll_fid_path_copy"); } -static ssize_t copy_data(int src_fd, int dst_fd, size_t iosize, - int extra_open_flags) +struct chunk_data { + ssize_t* copied_total; + int extra_open_flags; + size_t iosize; + off_t offset; + off_t end_offset; + int src_fd; + int dst_fd; +}; + +void *copy_data_threaded(void *arg) { + struct chunk_data *chunk = arg; + int extra_open_flags = chunk->extra_open_flags; + ssize_t* copied_total = chunk->copied_total; + off_t end_offset = chunk->end_offset; size_t page_size = sysconf(_SC_PAGESIZE); - size_t copied_total = 0; - void *buf = NULL; - off_t offset = 0; + size_t iosize = chunk->iosize; + off_t offset = chunk->offset; + int src_fd = chunk->src_fd; + int dst_fd = chunk->dst_fd; + void* buf = NULL; ssize_t rc = 0; + long int thread = syscall(__NR_gettid); rc = posix_memalign(&buf, page_size, iosize); if (rc) { - fprintf(stderr, "%s: memalign failed: rc = %ld\n", - program_invocation_short_name, rc); - return rc; + fprintf(stderr, "%s: memalign failed, thread %ld, size %lu: rc = %ld\n", + program_invocation_short_name, thread, iosize, rc); + return NULL; } - while (true) { + while (1) { ssize_t rsz; ssize_t wsz; @@ -48,31 +69,33 @@ static ssize_t copy_data(int src_fd, int dst_fd, size_t iosize, if (rsz < 0) { rc = -errno; fprintf(stderr, - "%s: failed to read from source, rc = %ld\n", - program_invocation_short_name, rc); + "%s: failed to read from source thread %ld, rc = %ld\n", + program_invocation_short_name, thread, rc); break; } + /* EOF, copy is done */ if (rsz == 0) break; -retry_write: + /* Retry writing if interrupted */ + retry_write: wsz = pwrite(dst_fd, buf, rsz, offset); if (wsz < 0) { rc = -errno; - /* - * Unaligned direct IO reports error code -EINVAL - * on the local file system (such as Ext4). - * In this case, fallback to buffered IO any way. + + /* Unaligned direct IO reports error code -EINVAL on the + * local file system (such as Ext4). In this case, + * fallback to buffered IO. */ if (extra_open_flags & O_DIRECT && rc == -EINVAL) { extra_open_flags &= ~O_DIRECT; rc = fcntl(dst_fd, F_SETFL, extra_open_flags); if (rc < 0) { rc = -errno; - fprintf(stderr, "%s: failed to clear O_DIRECT flag: rc = %ld", - program_invocation_short_name, - rc); + fprintf(stderr, + "%s: failed to clear O_DIRECT flag: rc = %ld", + program_invocation_short_name, rc); break; } rc = 0; @@ -85,72 +108,139 @@ retry_write: break; } - copied_total += wsz; + *copied_total += wsz; offset += wsz; + if (offset == end_offset) + break; } - if (rc == 0) - rc = copied_total; - free(buf); - return rc; - + return NULL; } -static ssize_t ll_fid_path_copy(const char *lustre_path, - const struct lu_fid *src_fid, - const char *dst_path, size_t iosize, - int extra_open_flags) +static ssize_t ll_fid_path_copy(const char* lustre_path, + const struct lu_fid* src_fid, + const char* dst_path, size_t iosize, + int extra_open_flags, int num_threads) { + struct chunk_data *thread_chunks = NULL; + ssize_t* copied_totals = NULL; + pthread_t* threads = NULL; + int max_threads_for_size; struct stat stbuf; + off_t chunk_size; + off_t offset = 0; int src_fd = -1; int dst_fd = -1; ssize_t rc = 0; + int i; + /* Open source file */ src_fd = llapi_open_by_fid(lustre_path, src_fid, O_RDONLY | extra_open_flags); if (src_fd < 0) { rc = -errno; fprintf(stderr, - "%s: failed to open source %s/"DFID": rc = %ld\n", + "%s: failed to open source %s/" DFID ": rc = %ld\n", program_invocation_short_name, lustre_path, PFID(src_fid), rc); return rc; } + /* Get source file information */ rc = fstat(src_fd, &stbuf); if (rc < 0) { fprintf(stderr, - "%s: failed to stat source %s/"DFID": rc = %ld\n", + "%s: failed to stat source %s/" DFID ": rc = %ld\n", program_invocation_short_name, lustre_path, PFID(src_fid), rc); goto out; } + /* Open destination file */ dst_fd = open(dst_path, O_WRONLY | O_LARGEFILE | extra_open_flags); if (dst_fd < 0) { rc = -errno; - fprintf(stderr, "%s: failed to open destination %s: rc = %ld\n", + fprintf(stderr, + "%s: failed to open destination %s: rc = %ld\n", program_invocation_short_name, dst_path, rc); goto out; } - rc = copy_data(src_fd, dst_fd, iosize, extra_open_flags); - if (rc < 0) { - errno = -rc; - fprintf(stderr, - "%s: failed to copy data from %s/"DFID" to %s: rc = %ld.\n", - program_invocation_short_name, lustre_path, - PFID(src_fid), dst_path, rc); - goto out; + /* Calculate the maximum number of threads that can be used based on + * file size and iosize. + * + * This assumes the cost of starting a thread is broadly similar to the + * cost of doing a single IO, which is reasonable because threads are + * cheap to spawn and read-write ops are relatively expensive. + * + * But because of synchronization/waiting overhead, etc, we do at least + * 2 instead of 1 IO per thread. + */ + max_threads_for_size = stbuf.st_size / (iosize * 2); + + if (max_threads_for_size == 0) + max_threads_for_size = 1; + + /* Adjust the number of threads based on the file size and iosize */ + num_threads = min(num_threads, max_threads_for_size); + + /* Calculate the initial chunk_size with the adjusted num_threads */ + chunk_size = (stbuf.st_size + num_threads - 1) / num_threads; + + /* Round chunk_size to the next iosize */ + chunk_size += iosize - (chunk_size % iosize); + + /* allocate thread tracking info */ + threads = malloc(num_threads * sizeof(pthread_t)); + copied_totals = malloc(num_threads * sizeof(ssize_t)); + thread_chunks = malloc(num_threads * sizeof(struct chunk_data)); + + for (i = 0; i < num_threads; i++) { + int result; + + copied_totals[i] = 0; + + thread_chunks[i].src_fd = src_fd; + thread_chunks[i].dst_fd = dst_fd; + thread_chunks[i].iosize = iosize; + thread_chunks[i].extra_open_flags = extra_open_flags; + thread_chunks[i].offset = offset; + thread_chunks[i].end_offset = offset + chunk_size; + thread_chunks[i].copied_total = &copied_totals[i]; + + result = pthread_create(&threads[i], NULL, (void* (*)(void*))copy_data_threaded, + &thread_chunks[i]); + if (result != 0) { + fprintf(stdout, + "%s: failed to create thread %d: %d\n", + program_invocation_short_name, i, result); + rc = -result; + goto join_threads; + } + + offset += chunk_size; + } + +join_threads: + for (i = 0; i < num_threads; i++) { + if (pthread_join(threads[i], NULL) != 0) { + fprintf(stdout, "%s: failed to join thread %d\n", + program_invocation_short_name, i); + rc = -1; + } + + /* Accumulate the total copied bytes */ + rc += copied_totals[i]; } if (rc != stbuf.st_size) { - fprintf(stderr, + fprintf(stdout, "%s: incomplete copy: copied %lu bytes of %lu\n", program_invocation_short_name, rc, stbuf.st_size); rc = -EIO; } + out: if (src_fd > 0) close(src_fd); @@ -158,11 +248,16 @@ out: if (dst_fd > 0) close(dst_fd); + free(threads); + free(copied_totals); + free(thread_chunks); + return rc; } -int main(int argc, char *argv[]) -{ +#define MAX_THREADS 255 + +int main(int argc, char* argv[]) { static struct option options[] = { { .name = "direct_io", .has_arg = no_argument, .val = 'd' }, { .name = "src_fid", .has_arg = required_argument, .val = 'F' }, @@ -170,17 +265,19 @@ int main(int argc, char *argv[]) { .name = "lustre_path", .has_arg = required_argument, .val = 'l' }, { .name = "dst_path", .has_arg = required_argument, .val = 'p' }, { .name = "iosize", .has_arg = required_argument, .val = 's' }, + { .name = "threads", .has_arg = required_argument, .val = 't' }, }; - char *dst_path = NULL; - char *lustre_path = NULL; - struct lu_fid src_fid; - char *fidstr = NULL; int extra_open_flags = 0; - __u32 iosize = 0; - ssize_t rc; + char* lustre_path = NULL; + char* dst_path = NULL; + struct lu_fid src_fid; + char* fidstr = NULL; + int num_threads = 1; + size_t iosize = 0; + ssize_t rc = 0; int c; - while ((c = getopt_long(argc, argv, "dF:hl:p:s:", options, NULL)) != -1) { + while ((c = getopt_long(argc, argv, "dF:hl:p:s:t:", options, NULL)) != -1) { switch (c) { case 'd': extra_open_flags = O_DIRECT; @@ -201,6 +298,9 @@ int main(int argc, char *argv[]) case 's': iosize = strtol(optarg, NULL, 0); break; + case 't': + num_threads = atoi(optarg); + break; } } @@ -233,33 +333,40 @@ int main(int argc, char *argv[]) } if (iosize % (1024 * 1024) != 0) { - fprintf(stderr, "%s: iosize (%u) must be 1 MiB aligned\n", + fprintf(stderr, "%s: iosize (%lu) must be 1 MiB aligned\n", program_invocation_short_name, iosize); usage(); exit(EXIT_FAILURE); } - fprintf(stdout, - "%s: copying from %s to %s, iosize %d, using %s i/o.\n", - program_invocation_short_name, fidstr, dst_path, iosize, - extra_open_flags == 0 ? "buffered" : "direct"); + if (num_threads < 1 || num_threads > MAX_THREADS) { + fprintf(stderr, + "%s: number of threads (%d) must be > 0 and <= %u\n", + program_invocation_short_name, num_threads, + MAX_THREADS); + usage(); + exit(EXIT_FAILURE); + } + + fprintf(stdout, "Copying from %s to %s, iosize %lu, using %s I/O with %d threads\n", + fidstr, dst_path, iosize, + (extra_open_flags == 0 ? "buffered" : "direct"), num_threads); rc = llapi_fid_parse(fidstr, &src_fid, NULL); if (rc < 0) { - fprintf(stderr, "%s: failed to parse fid: %s\n", + fprintf(stderr, "%s: Failed to parse fid: %s\n", program_invocation_short_name, fidstr); exit(EXIT_FAILURE); } - rc = ll_fid_path_copy(lustre_path, &src_fid, dst_path, iosize, - extra_open_flags); + extra_open_flags, num_threads); if (rc < 0) exit(EXIT_FAILURE); - fprintf(stdout, - "%s: successfully copied %lu bytes from %s to %s.\n", - program_invocation_short_name, rc, fidstr, dst_path); + fprintf(stdout, "Successfully copied %lu bytes from %s to %s.\n", + rc, fidstr, dst_path); exit(EXIT_SUCCESS); } + -- 1.8.3.1