Whamcloud - gitweb
EX-7585 pcc: parallel data copy for attach
authorPatrick Farrell <pfarrell@whamcloud.com>
Tue, 30 May 2023 21:48:15 +0000 (17:48 -0400)
committerAndreas Dilger <adilger@whamcloud.com>
Fri, 1 Sep 2023 13:19:14 +0000 (13:19 +0000)
This patch parallelize the data copying work for pcc attach
by using multiple threads in the ll_fid_path_copy helper.

Nvidia provided performance numbers for this from their
environment.  This was with 4 MiB I/O size, they reported
speed was similar but *slightly* lower at larger block
sizes. This is probably an EXT4 limitation since Lustre
speed scales with those larger sizes.  (As PCC attach is a
copy from Lustre to EXT4.)

This is for attaching a single 2 TiB file, they also
reported no performance regression for datasets with many
small files.

threads: 1       2         4          8
speed:   4 GiB/s 7.8 GiB/s 14.1 GiB/s 15.2 GiB/s

Performance improved only very slightly past 8 threads,
and 4 threads is clearly the sweet spot for performance.

Signed-off-by: Patrick Farrell <pfarrell@whamcloud.com>
Change-Id: Iffbb3892cfb5b2e71afe15d03f9aec9c84975092
Reviewed-on: https://review.whamcloud.com/c/ex/lustre-release/+/51171
Tested-by: jenkins <devops@whamcloud.com>
Tested-by: Maloo <maloo@whamcloud.com>
Reviewed-by: Qian Yingjin <qian@ddn.com>
Reviewed-by: Andreas Dilger <adilger@whamcloud.com>
lustre/llite/lproc_llite.c
lustre/llite/pcc.c
lustre/llite/pcc.h
lustre/tests/sanity-pcc.sh
lustre/utils/Makefile.am
lustre/utils/ll_fid_path_copy.c

index cd28ab1..eeee6f9 100644 (file)
@@ -710,6 +710,40 @@ static ssize_t pcc_dio_attach_threshold_store(struct kobject *kobj,
 }
 LUSTRE_RW_ATTR(pcc_dio_attach_threshold);
 
+static ssize_t pcc_dio_attach_threads_per_file_show(struct kobject *kobj,
+                                                   struct attribute *attr,
+                                                   char *buffer)
+{
+       struct ll_sb_info *sbi = container_of(kobj, struct ll_sb_info,
+                                             ll_kset.kobj);
+       struct pcc_super *super = &sbi->ll_pcc_super;
+
+       return sprintf(buffer, "%u\n",
+                      super->pccs_dio_attach_threads_per_file);
+}
+
+static ssize_t pcc_dio_attach_threads_per_file_store(struct kobject *kobj,
+                                                    struct attribute *attr,
+                                                    const char *buffer, size_t count)
+{
+       struct ll_sb_info *sbi = container_of(kobj, struct ll_sb_info,
+                                             ll_kset.kobj);
+       struct pcc_super *super = &sbi->ll_pcc_super;
+       unsigned int val;
+       int rc;
+
+       rc = kstrtouint(buffer, 0, &val);
+       if (rc)
+               return rc;
+
+       if (val < 1 || val > PCC_DIO_ATTACH_MAX_THREADS_PER_FILE)
+               return -EINVAL;
+
+       super->pccs_dio_attach_threads_per_file = val;
+       return count;
+}
+LUSTRE_RW_ATTR(pcc_dio_attach_threads_per_file);
+
 static ssize_t
 pcc_max_attach_thread_num_show(struct kobject *kobj, struct attribute *attr,
                               char *buffer)
@@ -2105,6 +2139,7 @@ static struct attribute *llite_attrs[] = {
        &lustre_attr_pcc_dio_attach_size_mb.attr,
        &lustre_attr_pcc_dio_attach_iosize_mb.attr,
        &lustre_attr_pcc_dio_attach_threshold.attr,
+       &lustre_attr_pcc_dio_attach_threads_per_file.attr,
        &lustre_attr_pcc_max_attach_thread_num.attr,
        &lustre_attr_pcc_mode.attr,
        &lustre_attr_pcc_async_affinity.attr,
index 2b5716a..7997c09 100644 (file)
@@ -132,6 +132,8 @@ int pcc_super_init(struct pcc_super *super)
        super->pccs_async_threshold = PCC_DEFAULT_ASYNC_THRESHOLD;
        super->pccs_dio_attach_iosize = PCC_DEFAULT_DIO_ATTACH_IOSIZE;
        super->pccs_dio_attach_threshold = PCC_DEFAULT_DIO_ATTACH_THRESHOLD;
+       super->pccs_dio_attach_threads_per_file =
+               PCC_DEFAULT_DIO_ATTACH_THREADS_PER_FILE;
        super->pccs_mode = S_IRUSR;
        atomic_set(&super->pccs_attaches_queued, 0);
        super->pccs_maximum_queued_attaches = PCCS_DEFAULT_ATTACH_QUEUE_DEPTH;
@@ -3895,12 +3897,18 @@ static ssize_t pcc_copy_data_dio(struct pcc_super *super, struct file *lu_file,
                argv[7] = "-s",
                argv[8] = "", /* iosize */
                argv[9] = "-d", /* use DIO */
-               argv[10] = NULL,
+               argv[10] = "-t",
+               argv[11] = "", /* numthreads */
+               argv[12] = NULL,
        };
+       __u32 numthreads = super->pccs_dio_attach_threads_per_file;
        __u32 iosize = super->pccs_dio_attach_iosize;
 /* iosize is __u32, 2^32 is 4 billion, which is 10 digits, so + 1 for nul */
 #define IOSIZE_MAXLEN 11
        char iosize_str[IOSIZE_MAXLEN];
+/* max number of threads is 255, so 3 digits + 1 for nul */
+#define NUMTHREADS_MAXLEN 4
+       char numthreads_str[NUMTHREADS_MAXLEN];
        char fidstring[FID_LEN];
        ssize_t rc = 0;
        struct lu_fid *srcfid = NULL;
@@ -3924,22 +3932,27 @@ static ssize_t pcc_copy_data_dio(struct pcc_super *super, struct file *lu_file,
                RETURN(-E2BIG);
        }
        argv[8] = iosize_str;
+       if (snprintf(numthreads_str, NUMTHREADS_MAXLEN, "%u", numthreads) >=
+           NUMTHREADS_MAXLEN) {
+               RETURN(-E2BIG);
+       }
+       argv[11] = numthreads_str;
 
        rc = call_usermodehelper(argv[0], argv, envp, UMH_WAIT_PROC);
        if (rc != 0) {
-               CERROR("%s: Error copying data for attach, Lustre file: %s, PCC file: %s, Lustre mntpath %s, iosize %u, pid %d rc: %lu:%zd.\n",
+               CERROR("%s: Error copying data for attach, Lustre file: %s, PCC file: %s, Lustre mntpath %s, iosize %u, threads %d, pid %d rc: %lu:%zd.\n",
                       ll_i2sbi(file_inode(lu_file))->ll_fsname, fidstring,
                       pcc_filepath, super->pccs_lu_pathname, iosize,
-                      current->pid, rc, rc);
+                      numthreads, current->pid, rc, rc);
        }
        RETURN(rc);
 }
 
 static ssize_t pcc_copy_data(struct pcc_super *super, struct file *lu_file,
                             struct file *pcc_file, char *pcc_pathname,
-                            __u64 size, bool atomic_open_locked)
+                            __u64 size, bool atomic_open_locked, bool use_dio)
 {
-       if (pcc_pathname) {
+       if (use_dio) {
 #ifndef HAVE_INODE_RWSEM
                int rc;
                struct dentry *dentry = file_dentry(lu_file);
@@ -3975,6 +3988,7 @@ static int pcc_attach_data_archive(struct file *lu_file,
        char *pcc_pathname = NULL;
        struct path pcc_path;
        ktime_t kstart = ktime_get();
+       bool use_dio = false;
        ssize_t ret;
        int flags = O_WRONLY | O_LARGEFILE;
        int rc;
@@ -3991,6 +4005,8 @@ static int pcc_attach_data_archive(struct file *lu_file,
            super->pccs_dio_attach_iosize != 0) {
                int pathlen;
 
+               use_dio = true;
+
                OBD_ALLOC(pcc_pathname, PATH_MAX);
                if (!pcc_pathname)
                        GOTO(out, rc = -ENOMEM);
@@ -4059,7 +4075,7 @@ static int pcc_attach_data_archive(struct file *lu_file,
        }
 
        ret = pcc_copy_data(super, lu_file, pcc_file, pcc_pathname, filesize,
-                           atomic_open_locked);
+                           atomic_open_locked, use_dio);
        if (direct)
                lu_file->f_flags |= O_DIRECT;
        if (ret < 0)
index 0e45185..c1e3152 100644 (file)
@@ -181,6 +181,9 @@ struct pcc_dataset {
  * attach is very bad for applications, so we try to be generous.
  */
 #define PCCS_DEFAULT_ATTACH_QUEUE_DEPTH        1024
+/* how many threads to use for copying for each DIO attach operation */
+#define PCC_DEFAULT_DIO_ATTACH_THREADS_PER_FILE 4
+#define PCC_DIO_ATTACH_MAX_THREADS_PER_FILE 256
 struct pcc_super {
        /* Protect pccs_datasets */
        struct rw_semaphore      pccs_rw_sem;
@@ -197,6 +200,7 @@ struct pcc_super {
        /* Size threshold for asynchrous PCC-RO attach in background. */
        __u64                    pccs_async_threshold;
        __u32                    pccs_dio_attach_iosize;
+       __u32                    pccs_dio_attach_threads_per_file;
        __u64                    pccs_dio_attach_threshold;
        bool                     pccs_async_affinity;
        umode_t                  pccs_mode;
index 28a604c..b0a5042 100755 (executable)
@@ -3799,6 +3799,8 @@ test_49a() {
        local file=$DIR/$tfile
        local src=$file.src
        local dest=$TMP/$tfile
+       local numthreads=2
+
        dd if=/dev/urandom of=$src bs=1M count=32 ||
                error "dd to create source file ($src) failed"
 
@@ -3810,7 +3812,9 @@ test_49a() {
        touch $dest
        stack_trap "rm -f $dest" EXIT
 
-       # Simple test of normal copy behavior
+       # Simple test of normal copy behavior, single threaded
+       # (all other tests are multithreaded, which is the main operating mode
+       #  for actual copies)
        local iosize=$((1024 * 1024)) # 1 MiB
        ll_fid_path_copy -F $srcfid -l $MOUNT -p $dest -s $iosize ||
                error "(0) fid_path_copy of $srcfid ($src) to $dest failed"
@@ -3819,8 +3823,8 @@ test_49a() {
        rm -f $dest
        touch $dest
 
-       # Normal copy but with DIO
-       ll_fid_path_copy -F $srcfid -l $MOUNT -p $dest -s $iosize -d ||
+       # Normal copy but with DIO and multiple threads
+       ll_fid_path_copy -F $srcfid -l $MOUNT -p $dest -s $iosize -d -t $numthreads ||
                error "(2) fid_path_copy of $srcfid ($src) to $dest failed"
        cmp -bl $src $dest ||
                error "(3) $dest corrupt after copy from $srcfid ($src)"
@@ -3832,7 +3836,7 @@ test_49a() {
                error "(4) dd to create source file failed"
        srcfid=$($LFS getstripe -F "$src")
        # DIO copy with unaligned file size
-       ll_fid_path_copy -F $srcfid -l $MOUNT -p $dest -s $iosize -d ||
+       ll_fid_path_copy -F $srcfid -l $MOUNT -p $dest -s $iosize -d -t $numthreads ||
                error "(5) fid_path_copy of $srcfid ($src) to $dest failed"
        cmp -bl $src $dest ||
                error "(6) $dest corrupt after copy from $srcfid ($src)"
@@ -3843,7 +3847,7 @@ test_49a() {
        dd if=/dev/urandom of=$src bs=2383K count=1 ||
                error "(7) dd to create source file failed"
        srcfid=$($LFS getstripe -F "$src")
-       ll_fid_path_copy -F $srcfid -l $MOUNT -p $dest -s $iosize -d ||
+       ll_fid_path_copy -F $srcfid -l $MOUNT -p $dest -s $iosize -d -t $numthreads ||
                error "(8) fid_path_copy of $srcfid ($src) to $dest failed"
        cmp -bl $src $dest ||
                error "(9) $dest corrupt after copy from $srcfid ($src)"
@@ -3872,17 +3876,26 @@ test_49a() {
        ll_fid_path_copy -F $srcfid -l $MOUNT -p $dest -s $iosize -d &&
                error "(15) invalid fid_path_copy succeeded"
 
+       #Invalid thread count
+       numthreads=0
+       ll_fid_path_copy -F $srcfid -l $MOUNT -p $dest -s $iosize -d -t $numthreads &&
+               error "(16) invalid fid_path_copy succeeded"
+
+       numthreads=512
+       ll_fid_path_copy -F $srcfid -l $MOUNT -p $dest -s $iosize -d -t $numthreads &&
+               error "(17) invalid fid_path_copy succeeded"
+
        rm -f $dest
        touch $dest
        iosize=$((1024 * 1024))
        echo "non-blksize-aligned file size for PCC backend (blksize = 512)"
        dd if=/dev/urandom of=$src bs=2383001 count=1 ||
-               error "(16) dd to create source file failed"
+               error "(18) dd to create source file failed"
        srcfid=$($LFS getstripe -F "$src")
        ll_fid_path_copy -F $srcfid -l $MOUNT -p $dest -s $iosize -d ||
-               error "(17) fid_path_copy of $srcfid ($src) to $dest failed"
+               error "(19) fid_path_copy of $srcfid ($src) to $dest failed"
        cmp -bl $src $dest ||
-               error "(18) $dest corrupt after copy from $srcfid ($src)"
+               error "(20) $dest corrupt after copy from $srcfid ($src)"
 
        echo "Test completed successfully"
 }
@@ -3899,6 +3912,8 @@ test_49b() {
 
        local dio_threshold=$(do_facet $SINGLEAGT $LCTL get_param -n llite.*.pcc_dio_attach_threshold | head -n 1)
        stack_trap "do_facet $SINGLEAGT $LCTL set_param llite.*.pcc_dio_attach_threshold=$dio_threshold" EXIT
+       local dio_attach_threads=$(do_facet $SINGLEAGT $LCTL get_param -n llite.*.pcc_dio_attach_threads_per_file | head -n 1)
+       stack_trap "do_facet $SINGLEAGT $LCTL set_param llite.*.pcc_dio_attach_threads_per_file=$dio_attach_threads" EXIT
 
        echo "Testing that invalid inputs should be clamped to [min, max] bound"
        do_facet $SINGLEAGT $LCTL set_param llite.*.pcc_dio_attach_iosize_mb=300 ||
@@ -3911,6 +3926,12 @@ test_49b() {
        do_facet $SINGLEAGT $LCTL set_param llite.*.pcc_dio_attach_iosize_mb=0 ||
                error "should be able to set attach size to 0"
 
+       do_facet $SINGLEAGT $LCTL set_param llite.*.pcc_dio_attach_threads_per_file=0 &&
+               error "shouldn't be able to set threads to 0"
+
+       do_facet $SINGLEAGT $LCTL set_param llite.*.pcc_dio_attach_threads_per_file=257 &&
+               error "shouldn't be able to set threads to more than 256"
+
        echo "Normal testing from here on - no errors expected."
 
        copytool setup -m "$MOUNT" -a "$HSM_ARCHIVE_NUMBER"
@@ -4032,6 +4053,28 @@ test_49b() {
        do_facet $SINGLEAGT $LFS pcc detach -k $file ||
                error "failed to detach file $file"
        check_lpcc_state $file "none"
+
+       # Test attach with 4 threads
+       do_facet $SINGLEAGT $LCTL set_param llite.*.pcc_dio_attach_threads_per_file=4 ||
+               error "failed to set threads to 4"
+
+       rm -f $file || error "failed to remove $file"
+
+       # Unaligned file size > DIO attach size (will attach with DIO)
+       dd if=/dev/urandom of=$file.src bs=20005K count=1 ||
+               error "failed to dd write to $file.src"
+
+       dd if=$file.src of=$file bs=20005K count=1 ||
+               error "failed dd to write $file"
+
+       echo "Start to RO-PCC attach/detach the file: $file"
+       do_facet $SINGLEAGT $LFS pcc attach -r -i $HSM_ARCHIVE_NUMBER $file ||
+               error "failed to attach file $file"
+       check_lpcc_state $file "readonly"
+       cmp -bl $file.src $file || error "$file corrupt after attach"
+       do_facet $SINGLEAGT $LFS pcc detach -k $file ||
+               error "failed to detach file $file"
+       check_lpcc_state $file "none"
 }
 run_test 49b "Basic testing of DIO attach and sanity testing of tunables"
 
index 132275b..a9a930f 100644 (file)
@@ -86,7 +86,7 @@ lustre_rsync_LDADD :=  $(LIBLUSTREAPI) $(PTHREAD_LIBS)
 lustre_rsync_DEPENDENCIES := liblustreapi.la
 
 ll_fid_path_copy_SOURCES = ll_fid_path_copy.c
-ll_fid_path_copy_LDADD :=  liblustreapi.la
+ll_fid_path_copy_LDADD :=  $(LIBLUSTREAPI) $(PTHREAD_LIBS)
 ll_fid_path_copy_DEPENDENCIES := liblustreapi.la
 
 llsom_sync_LDADD := $(LIBLUSTREAPI)
index 73dd98f..9e868f8 100644 (file)
@@ -1,46 +1,67 @@
-#include <stdio.h>
-#include <unistd.h>
 #include <errno.h>
+#include <fcntl.h>
 #include <getopt.h>
+#include <pthread.h>
 #include <stdlib.h>
-#include <lustre/lustreapi.h>
+#include <stdio.h>
+#include <unistd.h>
 #include <linux/lustre/lustre_idl.h>
+#include <lustre/lustreapi.h>
 #include <sys/types.h>
 #include <sys/stat.h>
-#include <fcntl.h>
+#include <sys/syscall.h>
+#include "lstddef.h"
+
 
 void usage(void)
 {
-       printf("Usage: %s: [OPTION]...\n"
+       printf("Usage: %s [OPTION]...\n"
 "Copy data from a file on Lustre (specified by fid) to a file specified by path\n"
 "\n"
 "Mandatory arguments to long options are mandatory for short options too.\n"
-"  -d, --direct_io              use direct I/O\n"
-"  -F, --source_fid [fid]       fid of source file for copy\n"
-"  -h, --help                   display this help and exit\n"
-"  -l, --lustre_dir [dir]       path to a directory on Lustre\n"
-"  -p, --destination_path [path] path of destination for copy\n"
-"  -s, --iosize [size]          do reads/writes of [size] bytes, must be 1 MiB aligned\n",
+"  -d, --direct_io             use direct I/O\n"
+"  -F, --source_fid [fid]      fid of source file for copy\n"
+"  -h, --help                  display this help and exit\n"
+"  -l, --lustre_dir [dir]      path to a directory on Lustre\n"
+"  -p, --destination_path      [path] path of destination for copy\n"
+"  -s, --iosize [size]         do reads/writes of [size] bytes, must be 1 MiB aligned\n"
+"  -t, --threads [num]         number of threads (default: 1)\n",
 "ll_fid_path_copy");
 }
 
-static ssize_t copy_data(int src_fd, int dst_fd, size_t iosize,
-                        int extra_open_flags)
+struct chunk_data {
+       ssize_t* copied_total;
+       int extra_open_flags;
+       size_t iosize;
+       off_t offset;
+       off_t end_offset;
+       int src_fd;
+       int dst_fd;
+};
+
+void *copy_data_threaded(void *arg)
 {
+       struct chunk_data *chunk = arg;
+       int extra_open_flags = chunk->extra_open_flags;
+       ssize_t* copied_total = chunk->copied_total;
+       off_t end_offset = chunk->end_offset;
        size_t page_size = sysconf(_SC_PAGESIZE);
-       size_t copied_total = 0;
-       void *buf = NULL;
-       off_t offset = 0;
+       size_t iosize = chunk->iosize;
+       off_t offset = chunk->offset;
+       int src_fd = chunk->src_fd;
+       int dst_fd = chunk->dst_fd;
+       void* buf = NULL;
        ssize_t rc = 0;
+       long int thread = syscall(__NR_gettid);
 
        rc = posix_memalign(&buf, page_size, iosize);
        if (rc) {
-               fprintf(stderr, "%s: memalign failed: rc = %ld\n",
-                       program_invocation_short_name, rc);
-               return rc;
+               fprintf(stderr, "%s: memalign failed, thread %ld, size %lu: rc = %ld\n",
+                       program_invocation_short_name, thread, iosize, rc);
+               return NULL;
        }
 
-       while (true) {
+       while (1) {
                ssize_t rsz;
                ssize_t wsz;
 
@@ -48,31 +69,33 @@ static ssize_t copy_data(int src_fd, int dst_fd, size_t iosize,
                if (rsz < 0) {
                        rc = -errno;
                        fprintf(stderr,
-                               "%s: failed to read from source, rc = %ld\n",
-                               program_invocation_short_name, rc);
+                               "%s: failed to read from source thread %ld, rc = %ld\n",
+                               program_invocation_short_name, thread, rc);
                        break;
                }
+
                /* EOF, copy is done */
                if (rsz == 0)
                        break;
 
-retry_write:
+               /* Retry writing if interrupted */
+       retry_write:
                wsz = pwrite(dst_fd, buf, rsz, offset);
                if (wsz < 0) {
                        rc = -errno;
-                       /*
-                        * Unaligned direct IO reports error code -EINVAL
-                        * on the local file system (such as Ext4).
-                        * In this case, fallback to buffered IO any way.
+
+                       /* Unaligned direct IO reports error code -EINVAL on the
+                        * local file system (such as Ext4). In this case,
+                        * fallback to buffered IO.
                         */
                        if (extra_open_flags & O_DIRECT && rc == -EINVAL) {
                                extra_open_flags &= ~O_DIRECT;
                                rc = fcntl(dst_fd, F_SETFL, extra_open_flags);
                                if (rc < 0) {
                                        rc = -errno;
-                                       fprintf(stderr, "%s: failed to clear O_DIRECT flag: rc = %ld",
-                                               program_invocation_short_name,
-                                               rc);
+                                       fprintf(stderr,
+                                               "%s: failed to clear O_DIRECT flag: rc = %ld",
+                                               program_invocation_short_name, rc);
                                        break;
                                }
                                rc = 0;
@@ -85,72 +108,139 @@ retry_write:
                        break;
                }
 
-               copied_total += wsz;
+               *copied_total += wsz;
                offset += wsz;
+               if (offset == end_offset)
+                       break;
        }
 
-       if (rc == 0)
-               rc = copied_total;
-
        free(buf);
-       return rc;
-
+       return NULL;
 }
 
-static ssize_t ll_fid_path_copy(const char *lustre_path,
-                               const struct lu_fid *src_fid,
-                               const char *dst_path, size_t iosize,
-                               int extra_open_flags)
+static ssize_t ll_fid_path_copy(const charlustre_path,
+                               const struct lu_fidsrc_fid,
+                               const chardst_path, size_t iosize,
+                               int extra_open_flags, int num_threads)
 {
+       struct chunk_data *thread_chunks = NULL;
+       ssize_t* copied_totals = NULL;
+       pthread_t* threads = NULL;
+       int max_threads_for_size;
        struct stat stbuf;
+       off_t chunk_size;
+       off_t offset = 0;
        int src_fd = -1;
        int dst_fd = -1;
        ssize_t rc = 0;
+       int i;
 
+       /* Open source file */
        src_fd = llapi_open_by_fid(lustre_path, src_fid,
                                   O_RDONLY | extra_open_flags);
        if (src_fd < 0) {
                rc = -errno;
                fprintf(stderr,
-                       "%s: failed to open source %s/"DFID": rc = %ld\n",
+                       "%s: failed to open source %s/" DFID ": rc = %ld\n",
                        program_invocation_short_name, lustre_path,
                        PFID(src_fid), rc);
                return rc;
        }
 
+       /* Get source file information */
        rc = fstat(src_fd, &stbuf);
        if (rc < 0) {
                fprintf(stderr,
-                       "%s: failed to stat source %s/"DFID": rc = %ld\n",
+                       "%s: failed to stat source %s/" DFID ": rc = %ld\n",
                        program_invocation_short_name, lustre_path,
                        PFID(src_fid), rc);
                goto out;
        }
 
+       /* Open destination file */
        dst_fd = open(dst_path, O_WRONLY | O_LARGEFILE | extra_open_flags);
        if (dst_fd < 0) {
                rc = -errno;
-               fprintf(stderr, "%s: failed to open destination %s: rc = %ld\n",
+               fprintf(stderr,
+                       "%s: failed to open destination %s: rc = %ld\n",
                        program_invocation_short_name, dst_path, rc);
                goto out;
        }
 
-       rc = copy_data(src_fd, dst_fd, iosize, extra_open_flags);
-       if (rc < 0) {
-               errno = -rc;
-               fprintf(stderr,
-                       "%s: failed to copy data from %s/"DFID" to %s: rc = %ld.\n",
-                       program_invocation_short_name, lustre_path,
-                       PFID(src_fid), dst_path, rc);
-               goto out;
+       /* Calculate the maximum number of threads that can be used based on
+        * file size and iosize.
+        *
+        * This assumes the cost of starting a thread is broadly similar to the
+        * cost of doing a single IO, which is reasonable because threads are
+        * cheap to spawn and read-write ops are relatively expensive.
+        *
+        * But because of synchronization/waiting overhead, etc, we do at least
+        * 2 instead of 1 IO per thread.
+        */
+       max_threads_for_size = stbuf.st_size / (iosize * 2);
+
+       if (max_threads_for_size == 0)
+               max_threads_for_size = 1;
+
+       /* Adjust the number of threads based on the file size and iosize */
+       num_threads = min(num_threads, max_threads_for_size);
+
+       /* Calculate the initial chunk_size with the adjusted num_threads */
+       chunk_size = (stbuf.st_size + num_threads - 1) / num_threads;
+
+       /* Round chunk_size to the next iosize */
+       chunk_size += iosize - (chunk_size % iosize);
+
+       /* allocate thread tracking info */
+       threads = malloc(num_threads * sizeof(pthread_t));
+       copied_totals = malloc(num_threads * sizeof(ssize_t));
+       thread_chunks = malloc(num_threads * sizeof(struct chunk_data));
+
+       for (i = 0; i < num_threads; i++) {
+               int result;
+
+               copied_totals[i] = 0;
+
+               thread_chunks[i].src_fd = src_fd;
+               thread_chunks[i].dst_fd = dst_fd;
+               thread_chunks[i].iosize = iosize;
+               thread_chunks[i].extra_open_flags = extra_open_flags;
+               thread_chunks[i].offset = offset;
+               thread_chunks[i].end_offset = offset + chunk_size;
+               thread_chunks[i].copied_total = &copied_totals[i];
+
+               result = pthread_create(&threads[i], NULL, (void* (*)(void*))copy_data_threaded,
+                                       &thread_chunks[i]);
+               if (result != 0) {
+                       fprintf(stdout,
+                               "%s: failed to create thread %d: %d\n",
+                               program_invocation_short_name, i, result);
+                       rc = -result;
+                       goto join_threads;
+               }
+
+               offset += chunk_size;
+       }
+
+join_threads:
+       for (i = 0; i < num_threads; i++) {
+               if (pthread_join(threads[i], NULL) != 0) {
+                       fprintf(stdout, "%s: failed to join thread %d\n",
+                               program_invocation_short_name, i);
+                       rc = -1;
+               }
+
+               /* Accumulate the total copied bytes */
+               rc += copied_totals[i];
        }
 
        if (rc != stbuf.st_size) {
-               fprintf(stderr,
+               fprintf(stdout,
                        "%s: incomplete copy: copied %lu bytes of %lu\n",
                        program_invocation_short_name, rc, stbuf.st_size);
                rc = -EIO;
        }
+
 out:
        if (src_fd > 0)
                close(src_fd);
@@ -158,11 +248,16 @@ out:
        if (dst_fd > 0)
                close(dst_fd);
 
+       free(threads);
+       free(copied_totals);
+       free(thread_chunks);
+
        return rc;
 }
 
-int main(int argc, char *argv[])
-{
+#define MAX_THREADS 255
+
+int main(int argc, char* argv[]) {
        static struct option options[] = {
                { .name = "direct_io", .has_arg = no_argument, .val = 'd' },
                { .name = "src_fid", .has_arg = required_argument, .val = 'F' },
@@ -170,17 +265,19 @@ int main(int argc, char *argv[])
                { .name = "lustre_path", .has_arg = required_argument, .val = 'l' },
                { .name = "dst_path", .has_arg = required_argument, .val = 'p' },
                { .name = "iosize", .has_arg = required_argument, .val = 's' },
+               { .name = "threads", .has_arg = required_argument, .val = 't' },
        };
-       char *dst_path = NULL;
-       char *lustre_path = NULL;
-       struct lu_fid src_fid;
-       char *fidstr = NULL;
        int extra_open_flags = 0;
-       __u32 iosize = 0;
-       ssize_t rc;
+       char* lustre_path = NULL;
+       char* dst_path = NULL;
+       struct lu_fid src_fid;
+       char* fidstr = NULL;
+       int num_threads = 1;
+       size_t iosize = 0;
+       ssize_t rc = 0;
        int c;
 
-       while ((c = getopt_long(argc, argv, "dF:hl:p:s:", options, NULL)) != -1) {
+       while ((c = getopt_long(argc, argv, "dF:hl:p:s:t:", options, NULL)) != -1) {
                switch (c) {
                        case 'd':
                                extra_open_flags = O_DIRECT;
@@ -201,6 +298,9 @@ int main(int argc, char *argv[])
                        case 's':
                                iosize = strtol(optarg, NULL, 0);
                                break;
+                       case 't':
+                               num_threads = atoi(optarg);
+                               break;
                }
        }
 
@@ -233,33 +333,40 @@ int main(int argc, char *argv[])
        }
 
        if (iosize % (1024 * 1024) != 0) {
-               fprintf(stderr, "%s: iosize (%u) must be 1 MiB aligned\n",
+               fprintf(stderr, "%s: iosize (%lu) must be 1 MiB aligned\n",
                        program_invocation_short_name, iosize);
                usage();
                exit(EXIT_FAILURE);
        }
 
-       fprintf(stdout,
-               "%s: copying from %s to %s, iosize %d, using %s i/o.\n",
-               program_invocation_short_name, fidstr, dst_path, iosize,
-               extra_open_flags == 0 ? "buffered" : "direct");
+       if (num_threads < 1 || num_threads > MAX_THREADS) {
+               fprintf(stderr,
+                       "%s: number of threads (%d) must be > 0 and <= %u\n",
+                       program_invocation_short_name, num_threads,
+                       MAX_THREADS);
+               usage();
+               exit(EXIT_FAILURE);
+       }
+
+       fprintf(stdout, "Copying from %s to %s, iosize %lu, using %s I/O with %d threads\n",
+               fidstr, dst_path, iosize,
+               (extra_open_flags == 0 ? "buffered" : "direct"), num_threads);
 
        rc = llapi_fid_parse(fidstr, &src_fid, NULL);
        if (rc < 0) {
-               fprintf(stderr, "%s: failed to parse fid: %s\n",
+               fprintf(stderr, "%s: Failed to parse fid: %s\n",
                        program_invocation_short_name, fidstr);
                exit(EXIT_FAILURE);
        }
 
-
        rc = ll_fid_path_copy(lustre_path, &src_fid, dst_path, iosize,
-                             extra_open_flags);
+                             extra_open_flags, num_threads);
        if (rc < 0)
                exit(EXIT_FAILURE);
 
-       fprintf(stdout,
-               "%s: successfully copied %lu bytes from %s to %s.\n",
-               program_invocation_short_name, rc, fidstr, dst_path);
+       fprintf(stdout, "Successfully copied %lu bytes from %s to %s.\n",
+               rc, fidstr, dst_path);
 
        exit(EXIT_SUCCESS);
 }
+