return rc;
}
-/* non-blocking read or write */
-static int nonblock_rw(bool wr, int fd, char *buf, int size)
+static void bandwidth_ctl_delay(int wsize)
{
- int rc;
-
- if (wr)
- rc = write(fd, buf, size);
- else
- rc = read(fd, buf, size);
-
- if ((rc < 0) && (errno == -EAGAIN)) {
- fd_set set;
- struct timeval timeout;
-
- timeout.tv_sec = opt.o_report_int;
-
- FD_ZERO(&set);
- FD_SET(fd, &set);
- if (wr)
- rc = select(FD_SETSIZE, NULL, &set, NULL, &timeout);
- else
- rc = select(FD_SETSIZE, &set, NULL, NULL, &timeout);
- if (rc < 0)
- return -errno;
- if (rc == 0)
- /* Timed out, we read nothing */
- return -EAGAIN;
-
- /* Should be available now */
- if (wr)
- rc = write(fd, buf, size);
- else
- rc = read(fd, buf, size);
- }
-
- if (rc < 0)
- rc = -errno;
-
- return rc;
+ static unsigned long long tot_bytes;
+ static time_t start_time;
+ static time_t last_time;
+ time_t now = time(0);
+ double tot_time;
+ double excess;
+ unsigned int sleep_time;
+
+ if (now > last_time + 5) {
+ tot_bytes = 0;
+ start_time = last_time = now;
+ }
+
+ tot_bytes += wsize;
+ tot_time = now - start_time;
+ if (tot_time < 1)
+ tot_time = 1;
+
+ excess = tot_bytes - tot_time * opt.o_bandwidth;
+ sleep_time = excess * 1000000 / opt.o_bandwidth;
+ if ((now - start_time) % 10 == 1)
+ CT_TRACE("bandwith control: excess=%E sleep for %dus", excess,
+ sleep_time);
+
+ if (excess > 0)
+ usleep(sleep_time);
+
+ last_time = now;
}
static int ct_copy_data(struct hsm_copyaction_private *hcp, const char *src,
const struct hsm_action_item *hai, long hal_flags)
{
struct hsm_extent he;
+ __u64 offset = hai->hai_extent.offset;
struct stat src_st;
struct stat dst_st;
char *buf;
- __u64 wpos = 0;
- __u64 rpos = 0;
- __u64 rlen;
- time_t last_print_time = time(0);
- int rsize;
- int wsize;
- int bufoff = 0;
+ __u64 write_total = 0;
+ __u64 length;
+ time_t last_print_time = time(NULL);
int rc = 0;
CT_TRACE("going to copy data from '%s' to '%s'", src, dst);
return -ENOMEM;
if (fstat(src_fd, &src_st) < 0) {
- CT_ERROR(errno, "cannot stat '%s'", src);
- return -errno;
+ rc = -errno;
+ CT_ERROR(rc, "cannot stat '%s'", src);
+ return rc;
}
if (!S_ISREG(src_st.st_mode)) {
}
if (fstat(dst_fd, &dst_st) < 0) {
- CT_ERROR(errno, "cannot stat '%s'", dst);
- return -errno;
+ rc = -errno;
+ CT_ERROR(rc, "cannot stat '%s'", dst);
+ return rc;
}
if (!S_ISREG(dst_st.st_mode)) {
goto out;
}
- rc = lseek(dst_fd, hai->hai_extent.offset, SEEK_SET);
- if (rc < 0) {
- rc = -errno;
- CT_ERROR(rc, "cannot seek for write to "LPU64" on '%s'",
- hai->hai_extent.offset, src);
- goto out;
- }
-
- he.offset = hai->hai_extent.offset;
+ he.offset = offset;
he.length = 0;
rc = llapi_hsm_action_progress(hcp, &he, 0);
if (rc < 0) {
errno = 0;
/* Don't read beyond a given extent */
- rlen = min(hai->hai_extent.length, src_st.st_size);
-
- CT_DEBUG("Going to copy "LPU64" bytes %s -> %s\n", rlen, src, dst);
+ length = min(hai->hai_extent.length, src_st.st_size);
- while (wpos < rlen) {
- int chunk = (rlen - wpos > opt.o_chunk_size) ?
- opt.o_chunk_size : rlen - wpos;
+ CT_DEBUG("Going to copy "LPU64" bytes %s -> %s\n", length, src, dst);
- /* Only read more if we wrote everything in the buffer */
- if (wpos == rpos) {
- rsize = nonblock_rw(0, src_fd, buf, chunk);
- if (rsize == 0)
- /* EOF */
- break;
-
- if (rsize == -EAGAIN) {
- /* Timed out */
- rsize = 0;
- if (rpos == 0) {
- /* Haven't read anything yet, let's
- * give it back to the coordinator
- * for rescheduling */
- rc = -EAGAIN;
- break;
- }
- }
+ while (write_total < length) {
+ ssize_t rsize;
+ ssize_t wsize;
+ int chunk = (length - write_total > opt.o_chunk_size) ?
+ opt.o_chunk_size : length - write_total;
- if (rsize < 0) {
- rc = rsize;
- CT_ERROR(rc, "cannot read from '%s'", src);
- break;
- }
+ rsize = pread(src_fd, buf, chunk, offset);
+ if (rsize == 0)
+ /* EOF */
+ break;
- rpos += rsize;
- bufoff = 0;
+ if (rsize < 0) {
+ rc = -errno;
+ CT_ERROR(rc, "cannot read from '%s'", src);
+ break;
}
- wsize = nonblock_rw(1, dst_fd, buf + bufoff, rpos - wpos);
- if (wsize == -EAGAIN)
- /* Timed out */
- wsize = 0;
-
+ wsize = pwrite(dst_fd, buf, rsize, offset);
if (wsize < 0) {
- rc = wsize;
+ rc = -errno;
CT_ERROR(rc, "cannot write to '%s'", dst);
break;
}
- wpos += wsize;
- bufoff += wsize;
-
- if (opt.o_bandwidth != 0) {
- static unsigned long long tot_bytes;
- static time_t start_time, last_time;
- time_t now = time(0);
- double tot_time, excess;
- unsigned int sleep_time;
+ write_total += wsize;
+ offset += wsize;
- if (now > last_time + 5) {
- tot_bytes = 0;
- start_time = last_time = now;
- }
-
- tot_bytes += wsize;
- tot_time = now - start_time;
- if (tot_time < 1)
- tot_time = 1;
-
- excess = tot_bytes - tot_time * opt.o_bandwidth;
- sleep_time = excess * 1000000 / opt.o_bandwidth;
- if ((now - start_time) % 10 == 1)
- CT_TRACE("bandwith control: excess=%E"
- " sleep for %dus",
- excess, sleep_time);
-
- if (excess > 0)
- usleep(sleep_time);
-
- last_time = now;
- }
+ if (opt.o_bandwidth != 0)
+ /* sleep if needed, to honor bandwidth limits */
+ bandwidth_ctl_delay(wsize);
if (time(0) >= last_print_time + opt.o_report_int) {
last_print_time = time(0);
- CT_TRACE("%%"LPU64" ", 100 * wpos / rlen);
- he.length = wpos;
+ CT_TRACE("%%"LPU64" ", 100 * write_total / length);
+ he.length = write_total;
rc = llapi_hsm_action_progress(hcp, &he, 0);
if (rc < 0) {
/* Action has been canceled or something wrong