*
* This particular tool can also import an existing HSM archive.
*/
+
+#ifndef _GNU_SOURCE
+#define _GNU_SOURCE
+#endif
+#include <stdio.h>
#include <stdint.h>
#include <stdlib.h>
#include <dirent.h>
unsigned long long o_bandwidth;
size_t o_chunk_size;
enum ct_action o_action;
+ char *o_event_fifo;
char *o_mnt;
char *o_hsm_root;
char *o_src; /* for import, or rebind */
struct options opt = {
.o_copy_attrs = 1,
.o_shadow_tree = 1,
- .o_verbose = LLAPI_MSG_WARN,
+ .o_verbose = LLAPI_MSG_INFO,
.o_copy_xattrs = 1,
.o_report_int = REPORT_INTERVAL_DEFAULT,
.o_chunk_size = ONE_MB,
"into a Lustre filesystem.\n"
" Usage:\n"
" %s [options] --import <src> <dst> <lustre_mount_point>\n"
- " import an archived subtree at\n"
- " <src> (relative to hsm_root) into the Lustre filesystem at\n"
- " <dst> (absolute)\n"
+ " import an archived subtree from\n"
+ " <src> (FID or relative path to hsm_root) into the Lustre\n"
+ " filesystem at\n"
+ " <dst> (absolute path)\n"
" %s [options] --rebind <old_FID> <new_FID> <lustre_mount_point>\n"
" rebind an entry in the HSM to a new FID\n"
" <old_FID> old FID the HSM entry is bound to\n"
" each line of <list_file> consists of <old_FID> <new_FID>\n"
" %s [options] --max-sequence <fsname>\n"
" return the max fid sequence of archived files\n"
- " -A, --archive <#> Archive number (repeatable)\n"
- " -p, --hsm-root <path> Target HSM mount point\n"
- " -q, --quiet Produce less verbose output\n"
- " -v, --verbose Produce more verbose output\n"
- " -c, --chunk-size <sz> I/O size used during data copy\n"
- " (unit can be used, default is MB)\n"
- " --abort-on-error Abort operation on major error\n"
- " --dry-run Don't run, just show what would be done\n"
- " --bandwidth <bw> Limit I/O bandwidth (unit can be used\n,"
- " default is MB)\n",
+ " --abort-on-error Abort operation on major error\n"
+ " -A, --archive <#> Archive number (repeatable)\n"
+ " -b, --bandwidth <bw> Limit I/O bandwidth (unit can be used\n,"
+ " default is MB)\n"
+ " --dry-run Don't run, just show what would be done\n"
+ " -c, --chunk-size <sz> I/O size used during data copy\n"
+ " (unit can be used, default is MB)\n"
+ " -f, --event-fifo <path> Write events stream to fifo\n"
+ " -p, --hsm-root <path> Target HSM mount point\n"
+ " -q, --quiet Produce less verbose output\n"
+ " -u, --update-interval <s> Interval between progress reports sent\n"
+ " to Coordinator\n"
+ " -v, --verbose Produce more verbose output\n",
cmd_name, cmd_name, cmd_name, cmd_name, cmd_name);
exit(rc);
{"chunk-size", required_argument, NULL, 'c'},
{"chunk_size", required_argument, NULL, 'c'},
{"daemon", no_argument, &opt.o_daemonize, 1},
+ {"event-fifo", required_argument, NULL, 'f'},
+ {"event_fifo", required_argument, NULL, 'f'},
{"dry-run", no_argument, &opt.o_dry_run, 1},
{"help", no_argument, NULL, 'h'},
{"hsm-root", required_argument, NULL, 'p'},
{"no_xattr", no_argument, &opt.o_copy_xattrs, 0},
{"quiet", no_argument, NULL, 'q'},
{"rebind", no_argument, NULL, 'r'},
- {"report", required_argument, &opt.o_report_int, 0},
+ {"update-interval", required_argument, NULL, 'u'},
+ {"update_interval", required_argument, NULL, 'u'},
{"verbose", no_argument, NULL, 'v'},
{0, 0, 0, 0}
};
unsigned long long unit;
optind = 0;
- while ((c = getopt_long(argc, argv, "A:b:c:hiMp:qrv",
+ while ((c = getopt_long(argc, argv, "A:b:c:f:hiMp:qru:v",
long_opts, NULL)) != -1) {
switch (c) {
case 'A':
else
opt.o_bandwidth = value;
break;
+ case 'f':
+ opt.o_event_fifo = optarg;
+ break;
case 'h':
usage(argv[0], 0);
case 'i':
case 'r':
opt.o_action = CA_REBIND;
break;
+ case 'u':
+ opt.o_report_int = atoi(optarg);
+ if (opt.o_report_int < 0) {
+ rc = -EINVAL;
+ CT_ERROR(rc, "bad value for -%c '%s'", c,
+ optarg);
+ return rc;
+ }
+ break;
case 'v':
opt.o_verbose++;
break;
return rc;
}
-/* non-blocking read or write */
-static int nonblock_rw(bool wr, int fd, char *buf, int size)
+static void bandwidth_ctl_delay(int wsize)
{
- int rc;
-
- if (wr)
- rc = write(fd, buf, size);
- else
- rc = read(fd, buf, size);
-
- if ((rc < 0) && (errno == -EAGAIN)) {
- fd_set set;
- struct timeval timeout;
-
- timeout.tv_sec = opt.o_report_int;
-
- FD_ZERO(&set);
- FD_SET(fd, &set);
- if (wr)
- rc = select(FD_SETSIZE, NULL, &set, NULL, &timeout);
- else
- rc = select(FD_SETSIZE, &set, NULL, NULL, &timeout);
- if (rc < 0)
- return -errno;
- if (rc == 0)
- /* Timed out, we read nothing */
- return -EAGAIN;
-
- /* Should be available now */
- if (wr)
- rc = write(fd, buf, size);
- else
- rc = read(fd, buf, size);
- }
-
- if (rc < 0)
- rc = -errno;
-
- return rc;
+ static unsigned long long tot_bytes;
+ static time_t start_time;
+ static time_t last_time;
+ time_t now = time(0);
+ double tot_time;
+ double excess;
+ unsigned int sleep_time;
+
+ if (now > last_time + 5) {
+ tot_bytes = 0;
+ start_time = last_time = now;
+ }
+
+ tot_bytes += wsize;
+ tot_time = now - start_time;
+ if (tot_time < 1)
+ tot_time = 1;
+
+ excess = tot_bytes - tot_time * opt.o_bandwidth;
+ sleep_time = excess * 1000000 / opt.o_bandwidth;
+ if ((now - start_time) % 10 == 1)
+ CT_TRACE("bandwith control: excess=%E sleep for %dus", excess,
+ sleep_time);
+
+ if (excess > 0)
+ usleep(sleep_time);
+
+ last_time = now;
}
static int ct_copy_data(struct hsm_copyaction_private *hcp, const char *src,
const struct hsm_action_item *hai, long hal_flags)
{
struct hsm_extent he;
+ __u64 offset = hai->hai_extent.offset;
struct stat src_st;
struct stat dst_st;
- char *buf;
- __u64 wpos = 0;
- __u64 rpos = 0;
- __u64 rlen;
- time_t last_print_time = time(0);
- int rsize;
- int wsize;
- int bufoff = 0;
+ char *buf = NULL;
+ __u64 write_total = 0;
+ __u64 length;
+ time_t last_print_time = time(NULL);
int rc = 0;
CT_TRACE("going to copy data from '%s' to '%s'", src, dst);
- buf = malloc(opt.o_chunk_size);
- if (buf == NULL)
- return -ENOMEM;
-
if (fstat(src_fd, &src_st) < 0) {
- CT_ERROR(errno, "cannot stat '%s'", src);
- return -errno;
+ rc = -errno;
+ CT_ERROR(rc, "cannot stat '%s'", src);
+ return rc;
}
if (!S_ISREG(src_st.st_mode)) {
return rc;
}
- rc = lseek(src_fd, hai->hai_extent.offset, SEEK_SET);
- if (rc < 0) {
- CT_ERROR(errno,
- "cannot seek for read to "LPU64" (len %jd) in '%s'",
- hai->hai_extent.offset, (intmax_t)src_st.st_size, src);
- rc = -errno;
- goto out;
- }
-
if (fstat(dst_fd, &dst_st) < 0) {
- CT_ERROR(errno, "cannot stat '%s'", dst);
- return -errno;
+ rc = -errno;
+ CT_ERROR(rc, "cannot stat '%s'", dst);
+ return rc;
}
if (!S_ISREG(dst_st.st_mode)) {
return rc;
}
- rc = lseek(dst_fd, hai->hai_extent.offset, SEEK_SET);
+ rc = lseek(src_fd, hai->hai_extent.offset, SEEK_SET);
if (rc < 0) {
rc = -errno;
- CT_ERROR(rc, "cannot seek for write to "LPU64" on '%s'",
- hai->hai_extent.offset, src);
- goto out;
+ CT_ERROR(rc,
+ "cannot seek for read to "LPU64" (len %jd) in '%s'",
+ hai->hai_extent.offset, (intmax_t)src_st.st_size, src);
+ return rc;
}
- he.offset = hai->hai_extent.offset;
+ /* Don't read beyond a given extent */
+ length = min(hai->hai_extent.length, src_st.st_size);
+
+ he.offset = offset;
he.length = 0;
- rc = llapi_hsm_action_progress(hcp, &he, 0);
+ rc = llapi_hsm_action_progress(hcp, &he, length, 0);
if (rc < 0) {
/* Action has been canceled or something wrong
* is happening. Stop copying data. */
}
errno = 0;
- /* Don't read beyond a given extent */
- rlen = (hai->hai_extent.length == -1LL) ?
- src_st.st_size : hai->hai_extent.length;
-
- while (wpos < rlen) {
- int chunk = (rlen - wpos > opt.o_chunk_size) ?
- opt.o_chunk_size : rlen - wpos;
-
- /* Only read more if we wrote everything in the buffer */
- if (wpos == rpos) {
- rsize = nonblock_rw(0, src_fd, buf, chunk);
- if (rsize == 0)
- /* EOF */
- break;
- if (rsize == -EAGAIN) {
- /* Timed out */
- rsize = 0;
- if (rpos == 0) {
- /* Haven't read anything yet, let's
- * give it back to the coordinator
- * for rescheduling */
- rc = -EAGAIN;
- break;
- }
- }
+ buf = malloc(opt.o_chunk_size);
+ if (buf == NULL) {
+ rc = -ENOMEM;
+ goto out;
+ }
- if (rsize < 0) {
- rc = rsize;
- CT_ERROR(rc, "cannot read from '%s'", src);
- break;
- }
+ CT_DEBUG("Going to copy "LPU64" bytes %s -> %s\n", length, src, dst);
- rpos += rsize;
- bufoff = 0;
- }
+ while (write_total < length) {
+ ssize_t rsize;
+ ssize_t wsize;
+ int chunk = (length - write_total > opt.o_chunk_size) ?
+ opt.o_chunk_size : length - write_total;
+
+ rsize = pread(src_fd, buf, chunk, offset);
+ if (rsize == 0)
+ /* EOF */
+ break;
- wsize = nonblock_rw(1, dst_fd, buf + bufoff, rpos - wpos);
- if (wsize == -EAGAIN)
- /* Timed out */
- wsize = 0;
+ if (rsize < 0) {
+ rc = -errno;
+ CT_ERROR(rc, "cannot read from '%s'", src);
+ break;
+ }
+ wsize = pwrite(dst_fd, buf, rsize, offset);
if (wsize < 0) {
- rc = wsize;
+ rc = -errno;
CT_ERROR(rc, "cannot write to '%s'", dst);
break;
}
- wpos += wsize;
- bufoff += wsize;
-
- if (opt.o_bandwidth != 0) {
- static unsigned long long tot_bytes;
- static time_t start_time, last_time;
- time_t now = time(0);
- double tot_time, excess;
- unsigned int sleep_time;
-
- if (now > last_time + 5) {
- tot_bytes = 0;
- start_time = last_time = now;
- }
-
- tot_bytes += wsize;
- tot_time = now - start_time;
- if (tot_time < 1)
- tot_time = 1;
-
- excess = tot_bytes - tot_time * opt.o_bandwidth;
- sleep_time = excess * 1000000 / opt.o_bandwidth;
- if ((now - start_time) % 10 == 1)
- CT_TRACE("bandwith control: excess=%E"
- " sleep for %dus",
- excess, sleep_time);
+ write_total += wsize;
+ offset += wsize;
- if (excess > 0)
- usleep(sleep_time);
-
- last_time = now;
- }
+ if (opt.o_bandwidth != 0)
+ /* sleep if needed, to honor bandwidth limits */
+ bandwidth_ctl_delay(wsize);
if (time(0) >= last_print_time + opt.o_report_int) {
last_print_time = time(0);
- CT_TRACE("%%"LPU64" ", 100 * wpos / rlen);
- he.length = wpos;
- rc = llapi_hsm_action_progress(hcp, &he, 0);
+ CT_TRACE("%%"LPU64" ", 100 * write_total / length);
+ he.length = write_total;
+ rc = llapi_hsm_action_progress(hcp, &he, length, 0);
if (rc < 0) {
/* Action has been canceled or something wrong
* is happening. Stop copying data. */
rc = ftruncate(dst_fd, src_st.st_size);
if (rc < 0) {
rc = -errno;
- CT_ERROR(rc,
- "cannot truncate '%s' to size %jd",
+ CT_ERROR(rc, "cannot truncate '%s' to size %jd",
dst, (intmax_t)src_st.st_size);
err_major++;
}
}
- free(buf);
+ if (buf != NULL)
+ free(buf);
return rc;
}
static int ct_fini(struct hsm_copyaction_private **phcp,
const struct hsm_action_item *hai, int hp_flags, int ct_rc)
{
- char lstr[PATH_MAX];
- int rc;
+ struct hsm_copyaction_private *hcp;
+ char lstr[PATH_MAX];
+ int rc;
CT_TRACE("Action completed, notifying coordinator "
"cookie="LPX64", FID="DFID", hp_flags=%d err=%d",
hp_flags, -ct_rc);
ct_path_lustre(lstr, sizeof(lstr), opt.o_mnt, &hai->hai_fid);
+
+ if (phcp == NULL || *phcp == NULL) {
+ rc = llapi_hsm_action_begin(&hcp, ctdata, hai, -1, 0, true);
+ if (rc < 0) {
+ CT_ERROR(rc, "llapi_hsm_action_begin() on '%s' failed",
+ lstr);
+ return rc;
+ }
+ phcp = &hcp;
+ }
+
rc = llapi_hsm_action_end(phcp, &hai->hai_extent, hp_flags, abs(ct_rc));
if (rc == -ECANCELED)
CT_ERROR(rc, "completed action on '%s' has been canceled: "
{
struct hsm_copyaction_private *hcp = NULL;
char src[PATH_MAX];
- char dst[PATH_MAX];
+ char dst[PATH_MAX] = "";
int rc;
int rcf = 0;
bool rename_needed = false;
int linkno = 0;
char *ptr;
int depth = 0;
- int sz;
+ ssize_t sz;
sprintf(buf, DFID, PFID(&hai->hai_fid));
sprintf(src, "%s/shadow/", opt.o_hsm_root);
}
/* symlink already exists ? */
sz = readlink(src, buf, sizeof(buf));
+ /* detect truncation */
+ if (sz == sizeof(buf)) {
+ rcf = rcf ? rcf : -E2BIG;
+ CT_ERROR(rcf, "readlink '%s' truncated", src);
+ goto fini_minor;
+ }
if (sz >= 0) {
buf[sz] = '\0';
if (sz == 0 || strncmp(buf, dst, sz) != 0) {
if (!(dst_fd < 0))
close(dst_fd);
- if (hcp != NULL)
- rc = ct_fini(&hcp, hai, hp_flags, rcf);
+ rc = ct_fini(&hcp, hai, hp_flags, rcf);
return rc;
}
CT_TRACE("data restore from '%s' to '%s' done", src, dst);
fini:
- if (hcp != NULL)
- rc = ct_fini(&hcp, hai, hp_flags, rc);
+ rc = ct_fini(&hcp, hai, hp_flags, rc);
/* object swaping is done by cdt at copy end, so close of volatile file
* cannot be done before */
}
fini:
- if (hcp != NULL)
- rc = ct_fini(&hcp, hai, 0, rc);
-
- return rc;
-}
-
-static int ct_report_error(const struct hsm_action_item *hai, int flags,
- int errval)
-{
- struct hsm_copyaction_private *hcp;
- int rc;
-
- rc = llapi_hsm_action_begin(&hcp, ctdata, hai, -1, 0, true);
- if (rc < 0)
- return rc;
-
- rc = llapi_hsm_action_end(&hcp, &hai->hai_extent, flags, abs(errval));
+ rc = ct_fini(&hcp, hai, 0, rc);
return rc;
}
CT_ERROR(rc, "unknown action %d, on '%s'", hai->hai_action,
opt.o_mnt);
err_minor++;
- ct_report_error(hai, 0, rc);
+ ct_fini(NULL, hai, 0, rc);
}
return 0;
static char *path_concat(const char *dirname, const char *basename)
{
char *result;
- int dirlen = strlen(dirname);
+ int rc;
- result = malloc(dirlen + strlen(basename) + 2);
- if (result == NULL)
+ rc = asprintf(&result, "%s/%s", dirname, basename);
+ if (rc < 0)
return NULL;
- memcpy(result, dirname, dirlen);
- result[dirlen] = '/';
- strcpy(result + dirlen + 1, basename);
-
return result;
}
+static int ct_import_fid(const lustre_fid *import_fid)
+{
+ char fid_path[PATH_MAX];
+ int rc;
+
+ ct_path_lustre(fid_path, sizeof(fid_path), opt.o_mnt, import_fid);
+ rc = access(fid_path, F_OK);
+ if (rc == 0 || errno != ENOENT) {
+ rc = (errno == 0) ? -EEXIST : -errno;
+ CT_ERROR(rc, "cannot import '"DFID"'", PFID(import_fid));
+ return rc;
+ }
+
+ ct_path_archive(fid_path, sizeof(fid_path), opt.o_hsm_root,
+ import_fid);
+
+ CT_TRACE("Resolving "DFID" to %s", PFID(import_fid), fid_path);
+
+ return ct_import_one(fid_path, opt.o_dst);
+}
+
static int ct_import_recurse(const char *relpath)
{
DIR *dir;
struct dirent ent, *cookie = NULL;
char *srcpath, *newpath;
+ lustre_fid import_fid;
int rc;
if (relpath == NULL)
return -EINVAL;
+ /* Is relpath a FID? In which case SFID should expand to three
+ * elements. */
+ rc = sscanf(relpath, SFID, RFID(&import_fid));
+ if (rc == 3)
+ return ct_import_fid(&import_fid);
+
srcpath = path_concat(opt.o_hsm_root, relpath);
if (srcpath == NULL) {
err_major++;
strncat(src, ".lov", sizeof(src) - strlen(src) - 1);
strncat(dst, ".lov", sizeof(dst) - strlen(dst) - 1);
if (rename(src, dst))
- CT_ERROR(errno, "cannot '%s' rename to '%s'", src, dst);
+ CT_ERROR(errno, "cannot rename '%s' to '%s'", src, dst);
}
return 0;
sprintf(path + strlen(path), "/%04x", subseq);
}
- printf("max_sequence: %016Lx\n", seq);
+ printf("max_sequence: "LPX64"\n", seq);
return 0;
}
}
}
+ setbuf(stdout, NULL);
+
+ if (opt.o_event_fifo != NULL) {
+ rc = llapi_hsm_register_event_fifo(opt.o_event_fifo);
+ if (rc < 0) {
+ CT_ERROR(rc, "failed to register event fifo");
+ return rc;
+ }
+ llapi_error_callback_set(llapi_hsm_log_error);
+ }
+
rc = llapi_hsm_copytool_register(&ctdata, opt.o_mnt, 0,
opt.o_archive_cnt, opt.o_archive_id);
if (rc < 0) {
hal->hal_fsname, hal->hal_archive_id, hal->hal_count);
if (strcmp(hal->hal_fsname, fs_name) != 0) {
- CT_ERROR(EINVAL, "'%s' invalid fs name, expecting: %s",
+ rc = -EINVAL;
+ CT_ERROR(rc, "'%s' invalid fs name, expecting: %s",
hal->hal_fsname, fs_name);
err_major++;
if (opt.o_abort_on_error)
hai = hai_next(hai);
}
- llapi_hsm_action_list_free(&hal);
-
if (opt.o_abort_on_error && err_major)
break;
}
llapi_hsm_copytool_unregister(&ctdata);
+ if (opt.o_event_fifo != NULL)
+ llapi_hsm_unregister_event_fifo(opt.o_event_fifo);
return rc;
}
if (rc < 0) {
CT_ERROR(rc, "cannot find a Lustre filesystem mounted at '%s'",
opt.o_mnt);
- return -rc;
+ return rc;
}
return rc;
return -rc;
}
- ct_setup();
+ rc = ct_setup();
+ if (rc < 0)
+ goto error_cleanup;
switch (opt.o_action) {
case CA_IMPORT:
" rc=%d (%s)", err_major, err_minor, rc,
strerror(-rc));
+error_cleanup:
ct_cleanup();
return -rc;