#include <libssh/callbacks.h>
#include "policy.h"
#include "list.h"
-#include "flist.h"
#include "debug.h"
#include "lipe_object_attrs.h"
#include "lipe_ssh.h"
static void usage(void)
{
- /*
- * internal option:
- * --iml-re-socket, unix domain socket to dump FIDs to resync
- * --iml-ex-socket, unix domain socket to dump FIDs to extend
- */
printf("\nUsage: %s [options]\n"
"options:\n"
"\t-a, --min-age, seconds before record handled (default: %u)\n"
char *rj_mirror_opts;
void (*rj_callback)(struct resync_job *, void *, int rc);
void *rj_callback_data;
- unsigned long rj_start;
+ time_t rj_start;
+ time_t rj_done_timestamp;
};
enum amigo_resync_type {
.o_alr_hot_after_idle = DEF_HOT_AFTER_IDLE,
};
+struct history {
+ struct lu_fid h_fid;
+ enum amigo_resync_type h_result;
+};
+
struct stats {
unsigned long s_read; /* llog records read */
unsigned long s_skipped; /* llog records skipped */
unsigned long s_replicate_ro2hot;
unsigned long s_replicate_rw2hot;
unsigned long s_replicate_rw2cold;
+ unsigned long s_skip_by_rule;
+ unsigned long s_extend_by_pool;
+ unsigned long s_extend_by_objects;
+ unsigned long s_skip_unknown;
+ unsigned long s_resync_stale;
+ unsigned long s_skip_insync;
+ unsigned long s_skip_by_source;
+ unsigned long s_extend_by_target;
+ struct history s_hist[100];
+ int s_hist_cur;
};
struct stats stats = { 0 };
+static void lamigo_hist_add(struct lu_fid *fid, enum amigo_resync_type result)
+{
+ struct history *h = stats.s_hist;
+
+ h[stats.s_hist_cur].h_fid = *fid;
+ h[stats.s_hist_cur].h_result = result;
+ stats.s_hist_cur++;
+ if (stats.s_hist_cur == ARRAY_SIZE(stats.s_hist))
+ stats.s_hist_cur = 0;
+}
+
struct fid_rec {
struct fid_hash fr_fh;
struct lipe_list_head fr_link;
struct pool_list *tgt_pools; /* target pool */
static void *lamigo_refresh_statfs_thread(void *arg);
-inline int are_agents_busy(void)
+static inline bool are_agents_busy(void)
{
- if (opt.o_iml_re_socket)
- return 0;
-
- if (lamigo_jobs_running >= lamigo_max_jobs)
- return 1;
-
- return 0;
+ return lamigo_jobs_running >= lamigo_max_jobs;
}
/* Convert human readable size string to and int; "1k" -> 1000 */
free(cmd);
}
-static struct lipe_flist *lamigo_flist_init(char *socket)
+#define JOB_FMT \
+ " job%u: { tid: %lu, fid: "DFID", index: %llu, agent: %d, start: %lu, done: %lu, command: %s }\n"
+
+static void lamigo_dump_jobs(FILE *out, struct lipe_list_head *jlist)
{
- struct lipe_flist *flist = flist_alloc(NULL, 1024,
- socket, LDT_UNIX_SOCKET);
- if (flist == NULL) {
- llapi_error(LLAPI_MSG_ERROR|LLAPI_MSG_NO_ERRNO, 0,
- "failed to alloc fid list for socket %s: %s\n",
- socket,
- strerror(errno));
- return NULL;
+ time_t now = time(NULL);
+ struct resync_job *j;
+ int i = 0;
+
+ lipe_list_for_each_entry(j, jlist, rj_list) {
+ char *cmd;
+
+ if (j->rj_setprefer)
+ cmd = "setprefer";
+ else if (j->rj_resync == AMIGO_RESYNC_EXTEND)
+ cmd = "extend";
+ else
+ cmd = "resync";
+
+ fprintf(out, JOB_FMT, i++, j->rj_pid, PFID(&j->rj_fid),
+ j->rj_index, j->rj_agent ? j->rj_agent->rag_index : -1,
+ now - j->rj_start, now - j->rj_done_timestamp, cmd);
}
+}
+
+static char *lamigo_resync2str[] = { "none", "extend", "resync" };
+
+static void lamigo_dump_history(FILE *out)
+{
+ int i = 0, cur = stats.s_hist_cur;
+ char *action;
- return flist;
+ for (i = 0; i < ARRAY_SIZE(stats.s_hist); i++) {
+ if (--cur < 0)
+ cur = ARRAY_SIZE(stats.s_hist) - 1;
+
+ if (stats.s_hist[cur].h_fid.f_seq == 0)
+ break;
+
+ if (stats.s_hist[cur].h_result > AMIGO_RESYNC_RESYNC)
+ action = "unknown";
+ else
+ action = lamigo_resync2str[stats.s_hist[cur].h_result];
+
+ fprintf(out, " hist%d: { fid: "DFID", result: %s }\n", cur,
+ PFID(&stats.s_hist[cur].h_fid), action);
+ }
}
void lamigo_usr1_handle(int sig)
{
struct resync_agent *a;
- struct resync_job *j;
struct pool_list *pl;
struct tm *tmtime;
char timestr[32];
" open: %d\n", pl->pl_pool, pl->pl_ostnr,
pl->pl_avail, pl->pl_total, (int)pl->pl_open);
- if (opt.o_iml_re_socket)
- fprintf(f, " iml_re_socket: %s\n", opt.o_iml_re_socket);
- if (opt.o_iml_ex_socket)
- fprintf(f, " iml_ex_socket: %s\n", opt.o_iml_ex_socket);
-
fprintf(f, "stats:\n"
" read: %lu\n"
" skipped: %lu\n"
" skip_hot: %lu\n"
" ro2hot: %lu\n"
" rw2hot: %lu\n"
- " rw2cold: %lu\n",
+ " rw2cold: %lu\n"
+ " skip-by-rule: %lu\n"
+ " extend-by-pool: %lu\n"
+ " extend-by-objects: %lu\n"
+ " skip_unknown: %lu\n"
+ " resync-stale: %lu\n"
+ " skip-insync: %lu\n"
+ " skip-by-source: %lu\n"
+ " extend-by-target: %lu\n",
stats.s_read, stats.s_skipped, stats.s_processed,
stats.s_removed, stats.s_dups, stats.s_spawned,
stats.s_replicated, stats.s_busy, head.lh_cached_count,
stats.s_skip_hot, stats.s_replicate_ro2hot,
- stats.s_replicate_rw2hot, stats.s_replicate_rw2cold);
+ stats.s_replicate_rw2hot, stats.s_replicate_rw2cold,
+ stats.s_skip_by_rule, stats.s_extend_by_pool,
+ stats.s_extend_by_objects, stats.s_skip_unknown,
+ stats.s_resync_stale, stats.s_skip_insync,
+ stats.s_skip_by_source, stats.s_extend_by_target);
tmtime = localtime(&lamigo_last_cleared);
strftime(timestr, sizeof(timestr), "%c", tmtime);
a->rag_bad ? "inactive" : "active");
}
-#define JOB_FMT \
- " job%u: { tid: %lu, fid: "DFID", index: %llu, agent: %d, start: %lu, command: %s }\n"
-
- i = 0;
fprintf(f, "jobs:\n");
- lipe_list_for_each_entry(j, &lamigo_job_list, rj_list) {
- fprintf(f, JOB_FMT, i++, j->rj_pid, PFID(&j->rj_fid),
- j->rj_index, j->rj_agent->rag_index, j->rj_start,
- j->rj_resync == AMIGO_RESYNC_EXTEND ? "extend" :
- "resync");
- }
+ lamigo_dump_jobs(f, &lamigo_job_list);
+
+ fprintf(f, "failed jobs:\n");
+ lamigo_dump_jobs(f, &lamigo_failed_job_list);
+
+ fprintf(f, "history:\n");
+ lamigo_dump_history(f);
fflush(f);
fclose(f);
if (rc)
return rc;
- if (opt.o_iml_re_socket) {
- opt.o_re_flist = lamigo_flist_init(opt.o_iml_re_socket);
- if (opt.o_re_flist == NULL)
- return -errno;
- opt.o_ex_flist = lamigo_flist_init(opt.o_iml_ex_socket);
- if (opt.o_ex_flist == NULL)
- return -errno;
- }
-
return 0;
}
extern char **environ;
-/* json format: { "fid": "[FID]" }
- * written to either MIRROR or EXTEND socket
- */
-static int lamigo_flist_add_fid(struct lu_fid *fid, int resync)
-{
- int rc;
- const char *output;
- char fid_buf[FID_LEN + 1];
- struct json_object *obj_top;
-
- obj_top = json_object_new_object();
- snprintf(fid_buf, sizeof(fid_buf), DFID, PFID(fid));
- json_object_object_add(obj_top, "fid",
- json_object_new_string(fid_buf));
- output = json_object_to_json_string_ext(obj_top,
- JSON_C_TO_STRING_PLAIN);
- if (resync == AMIGO_RESYNC_RESYNC)
- rc = flist_add_one(opt.o_re_flist, output);
- else // AMIGO_RESYNC_EXTEND
- rc = flist_add_one(opt.o_ex_flist, output);
- json_object_put(obj_top);
-
- return rc;
-}
-
static int lamigo_exec_cmd(struct resync_agent *a, char *cmd)
{
struct resync_ssh_session *rss;
resync = AMIGO_RESYNC_EXTEND;
mo->mo_src_pool = pl;
mo->mo_need_prefer = true;
+ stats.s_extend_by_pool++;
goto out;
}
}
mo->mo_src_pool = found_on;
mo->mo_need_prefer = true;
resync = AMIGO_RESYNC_EXTEND;
+ stats.s_extend_by_objects++;
}
mo->mo_stripes = stripes;
goto out;
if (v3->lmm_magic != LOV_USER_MAGIC_COMP_V1) {
resync = AMIGO_RESYNC_NONE;
+ stats.s_skip_unknown++;
goto out;
}
-
resync = AMIGO_RESYNC_NONE;
mirid = -1;
stripes = 0;
* stale
*/
resync = AMIGO_RESYNC_RESYNC;
+ stats.s_resync_stale++;
goto out;
} else {
/* this is complete uptodate replica
* nothing to do... */
resync = AMIGO_RESYNC_NONE;
+ stats.s_skip_insync++;
goto out;
}
} else {
}
rc = lamigo_get_objects(v3, &objects, &stripes);
- if (rc)
+ if (rc) {
+ stats.s_skip_unknown++;
goto out;
+ }
/* total objects in this mirror */
objs_in_mirror += stripes;
mo->mo_stripes = stripes;
}
- if (!onsrc)
+ if (!onsrc) {
+ stats.s_skip_by_source++;
goto out;
+ }
/* subject to replication */
if (ontgt) {
* we can't count it as a replica
*/
resync = AMIGO_RESYNC_EXTEND;
+ stats.s_extend_by_target++;
} else if (stale) {
/* all mirror's object are on the target pool
* but some component is stale
*/
resync = AMIGO_RESYNC_RESYNC;
+ stats.s_resync_stale++;
goto out;
}
} else {
/* no replica found, make one */
resync = AMIGO_RESYNC_EXTEND;
+ stats.s_extend_by_target++;
}
out:
if (resync == AMIGO_RESYNC_EXTEND)
llapi_printf(LLAPI_MSG_DEBUG,
"skip "DFID" due to rules\n",
PFID(fid));
+ stats.s_skip_by_rule++;
goto out;
}
resync = lamigo_striping_is_in_sync(attrs.loa_lum, src, tgt, mo);
out:
+ lamigo_hist_add(fid, resync);
+
llapi_error(LLAPI_MSG_DEBUG|LLAPI_MSG_NO_ERRNO, 0,
"check "DFID" stripes=%d: resync=%d\n",
PFID(fid), mo->mo_stripes, resync);
{
struct resync_job *srj;
+ lamigo_alr_mark(&rj->rj_fid,
+ rc == 0 ? ALR_TAG_REPLICATED : ALR_TAG_FAILED,
+ ALR_TAG_NO_ACCT);
if (rc)
return;
return 0;
}
- if (opt.o_iml_re_socket) {
- rc = lamigo_flist_add_fid(&f->fr_fh.fh_fid, resync);
- if (rc < 0) {
- llapi_error(LLAPI_MSG_DEBUG|LLAPI_MSG_NO_ERRNO, 0,
- "can't add "DFID": rc=%d\n",
- PFID(&f->fr_fh.fh_fid), rc);
- return 1;
- }
- return 0;
- }
-
rj = calloc(1, sizeof(struct resync_job));
if (rj == NULL) {
llapi_error(LLAPI_MSG_DEBUG|LLAPI_MSG_NO_ERRNO, 0, "can't allocate for a job\n");
if (mo.mo_need_prefer && mo.mo_src_pool) {
rj->rj_callback_data = mo.mo_src_pool;
rj->rj_callback = lamigo_schedule_setprefer;
+ } else {
+ /* just to drop NO_ACCT tag */
+ rj->rj_callback = lamigo_alr_mirror_cb;
}
+
return lamigo_submit_job(rj);
}
}
}
- if (opt.o_iml_re_socket) {
- flist_write(opt.o_re_flist, true);
- flist_write(opt.o_ex_flist, true);
- }
-
return rc;
}
rj->rj_pid, PFID(&rj->rj_fid), time(NULL) - rj->rj_start,
retval, rj->rj_agent->rag_bad);
+ rj->rj_done_timestamp = time(NULL);
+
if (rj->rj_check_job) {
rj->rj_agent->rag_check_in_progress = 0;
if (retval == 0) {
lamigo_agent_count++;
}
-#define LAMIGO_OPT_IML_RE_SOCKET 1
-#define LAMIGO_OPT_IML_EX_SOCKET 2
#define LAMIGO_OPT_POOL_REFRESH 3
#define LAMIGO_OPT_PROGRESS_INTV 4
#define LAMIGO_OPT_MIRROR_CMD 5
{ "heatfn", required_argument, NULL, 'H'},
{ "hot_after_idle", required_argument, NULL, 'I'},
{ "heat-dump", required_argument, NULL, 'W'},
- { "iml-re-socket", required_argument, NULL, LAMIGO_OPT_IML_RE_SOCKET},
- { "iml-ex-socket", required_argument, NULL, LAMIGO_OPT_IML_EX_SOCKET},
{ "max-cache", required_argument, NULL, 'c'},
{ "min-age", required_argument, NULL, 'a'},
{ "mirror-cmd", required_argument, NULL, LAMIGO_OPT_MIRROR_CMD},
exit(1);
}
break;
- case LAMIGO_OPT_IML_RE_SOCKET:
- opt.o_iml_re_socket = strdup(optarg);
- break;
- case LAMIGO_OPT_IML_EX_SOCKET:
- opt.o_iml_ex_socket = strdup(optarg);
- break;
case LAMIGO_OPT_MIRROR_CMD:
opt.o_mirror_cmd = strdup(optarg);
break;
exit(1);
}
- // Indepenedent operation vs IML integration
- if (opt.o_iml_re_socket || opt.o_iml_ex_socket) {
- if (opt.o_iml_re_socket == NULL ||
- opt.o_iml_ex_socket == NULL) {
- llapi_err_noerrno(LLAPI_MSG_ERROR,
- "Both or neither sockets must be defined");
- exit(1);
- }
-
- } else if (lipe_list_empty(&lamigo_agent_list)) {
+ if (lipe_list_empty(&lamigo_agent_list)) {
llapi_err_noerrno(LLAPI_MSG_ERROR, "no agents configured?\n");
exit(1);
}
struct mirror_opts *mo)
{
struct resync_job *rj;
- int rc;
-
- if (opt.o_iml_re_socket) {
- rc = lamigo_flist_add_fid(fid, resync);
- if (rc < 0) {
- llapi_error(LLAPI_MSG_DEBUG|LLAPI_MSG_NO_ERRNO, 0,
- "can't add "DFID": rc=%d\n",
- PFID(fid), rc);
- return 1;
- }
- return 0;
- }
rj = calloc(1, sizeof(struct resync_job));
if (rj == NULL) {
}
resync = lamigo_check_user_rules(attrs, &sysattrs);
- if (resync == AMIGO_RESYNC_NONE)
+ if (resync == AMIGO_RESYNC_NONE) {
+ lamigo_hist_add(&attrs->loa_fid, resync);
goto out;
+ }
resync = lamigo_striping_is_in_sync((void *)attrs->loa_lum, src_pools,
tgt_pools, &mo);
- if (resync == AMIGO_RESYNC_NONE)
+ if (resync == AMIGO_RESYNC_NONE) {
+ lamigo_hist_add(&attrs->loa_fid, resync);
return 0;
+ }
lamigo_check_jobs();
char *o_device;
char *o_dumpfile;
char *o_fids_dumpfile;
- char *o_iml_socket;
char *o_mountpoint;
char *o_pool;
unsigned int o_max_jobs;
void load_config(char *name);
-struct lipe_flist *lflist;
#define LPURGE_FLIST_SIZE (1024 * 1024)
-static int lpurge_init_flist(void)
-{
- if (opt.o_iml_socket) {
- lflist = flist_alloc(NULL, LPURGE_FLIST_SIZE,
- opt.o_iml_socket,
- LDT_UNIX_SOCKET);
- } else {
- llapi_printf(LLAPI_MSG_FATAL,
- "no valid output found, exit\n");
- return -1;
- }
-
- if (lflist == NULL) {
- llapi_printf(LLAPI_MSG_FATAL,
- "failed to alloc fid list, %s\n",
- strerror(errno));
- return -errno;
- }
-
- return 0;
-}
-
static void sig_handler(int signal)
{
psignal(signal, "exiting");
- if (lflist)
- flist_free(lflist);
_exit(0);
}
"\t-h, --freehi, high watermark, %% of space (default: %u)\n"
"\t--help, print this help message\n"
"\t-i, --interval, seconds to next check (default: %u)\n"
- "\t--iml_socket, do not purge, just dump found objects to domain socket\n"
"\t-j, --max_jobs, max.jobs to release replicas (default: %u)\n"
"\t-l, --freelo, low watermark, %% of space (default: %u)\n"
"\t-m, --mds, MDS idx:host:mountpoint (can specify multiple)\n"
"\tpool=<pool>\n"
"\tmds=0:mds1host:/mnt/lustre\n"
"\tmds=1:mds2host:/mnt/lustre2\n");
- printf(" config example dryrun:\n"
- "\tdevice=lustre-OST0000\n"
- "\timl_socket=/tmp/socket.lpurge\n"
- "\tfreelo=20\n"
- "\tfreehi=30\n"
- "\tpool=<pool>\n");
exit(0);
}
if ((attrs->loa_attr_bits & LIPE_OBJECT_ATTR_FILTER_FID) == 0) {
ls->ls_nopfid_objs++;
ls->ls_nopfid_space += attrs->loa_blocks >> 10;
- pthread_mutex_unlock(&ls->ls_mutex);
- goto out;
+ goto out_ls_mutex;
}
/* to avoid N OSTs to 1 MDT scalability issue we only consider
if (attrs->loa_filter_fid.ff_parent.f_ver != 0) {
ls->ls_notfirst_objs++;
ls->ls_notfirst_space += attrs->loa_blocks >> 10;
- pthread_mutex_unlock(&ls->ls_mutex);
- goto out;
+ goto out_ls_mutex;
}
/* if the object has got ost_layout structure which encodes
== 0) {
ls->ls_nomirror_objs++;
ls->ls_nomirror_space += attrs->loa_blocks >> 10;
- pthread_mutex_unlock(&ls->ls_mutex);
- goto out;
+ goto out_ls_mutex;
}
}
if (ls->ls_stored > opt.o_slot_size) {
/* the youngest slot is full */
if (index == 0) {
- pthread_mutex_unlock(&ls->ls_mutex);
- goto out;
+ goto out_ls_mutex;
} else {
ls->ls_found--;
ls->ls_space -= attrs->loa_blocks >> 10;
llapi_printf(LLAPI_MSG_DEBUG, "reclaiming one slot\n");
lpurge_reclaim_slot(index);
- pthread_mutex_lock(&ls->ls_mutex);
ls = lpurge_hist + index;
+ pthread_mutex_lock(&ls->ls_mutex);
ls->ls_found++;
ls->ls_space += attrs->loa_blocks >> 10;
}
lo = calloc(1, sizeof(*lo));
if (lo == NULL)
- goto out;
+ goto out_ls_mutex;
lo->lo_fid = attrs->loa_filter_fid.ff_parent;
lo->lo_blocks = attrs->loa_blocks >> 10;
lipe_list_add(&lo->lo_list, &ls->ls_obj_list);
ls->ls_stored++;
+ llapi_printf(LLAPI_MSG_DEBUG, "add "DFID" to %p/%lu\n",
+ PFID(&attrs->loa_filter_fid.ff_parent), ls, ls->ls_stored);
+
if (ls->ls_max_utime < last_used)
ls->ls_max_utime = last_used;
if (ls->ls_min_utime > last_used)
ls->ls_min_utime = last_used;
- pthread_mutex_unlock(&ls->ls_mutex);
-
- llapi_printf(LLAPI_MSG_DEBUG, "add "DFID" to %p/%lu\n",
- PFID(&attrs->loa_filter_fid.ff_parent), ls, ls->ls_stored);
+out_ls_mutex:
+ pthread_mutex_unlock(&ls->ls_mutex);
out:
/*
* in lazy crawling mode scan for some limited number
}
}
-#define LPURGE_INTERNAL_DUMP_FIDS 1
-#define LPURGE_OPT_IML_SOCKET 2
-#define LPURGE_OPT_VERSION 3
+#define LPURGE_INTERNAL_DUMP_FIDS 1
+#define LPURGE_OPT_VERSION 2
static struct option options[] = {
{ "device", required_argument, NULL, 'D'},
{ "freelo", required_argument, NULL, 'l'},
{ "help", no_argument, NULL, 'H' },
{ "interval", required_argument, NULL, 'i'},
- { "iml_socket", required_argument, NULL, LPURGE_OPT_IML_SOCKET},
- { "listen_socket", required_argument, NULL, 's'},
{ "max_jobs", required_argument, NULL, 'j'},
{ "mds", required_argument, NULL, 'm'},
{ "mount", required_argument, NULL, 'M'},
}
opt.o_scan_rate = value;
break;
- case 's':
- llapi_printf(LLAPI_MSG_WARN,
- "'-s' and '--listen-socket' are deprecated, use '--iml-socket' instead\n");
- case LPURGE_OPT_IML_SOCKET:
- opt.o_iml_socket = strdup(optarg);
- break;
case 'S':
value = strtol(optarg, &endptr, 10);
if (*endptr != '\0' || value < 0) {
opt.o_fids_dumpfile = strdup(buf);
}
- if (opt.o_iml_socket == NULL && lustre_fd < 0) {
+ if (lustre_fd < 0) {
llapi_printf(LLAPI_MSG_ERROR,
"client mountpoint is not defined\n");
exit(1);
sysconf(_SC_NPROCESSORS_ONLN));
}
- // IML Integration
- if (opt.o_iml_socket) {
- int rc;
-
- rc = lpurge_init_flist();
- if (rc)
- exit(rc);
- return;
- }
-
if (opt.o_max_jobs < 1 || opt.o_max_jobs > 1024) {
llapi_printf(LLAPI_MSG_FATAL,
"invalid max_jobs: %d\n", opt.o_max_jobs);
int c;
while ((c = getopt_long(argc, argv,
- "dD:b::nh:l:i:s:j:M:m:p:R:t:S:f:w:?",
+ "dD:b::nh:l:i:j:M:m:p:R:t:S:f:w:?",
options, NULL))
!= EOF) {
/* Don't do further process if invalid option found */
opt.o_device, opt.o_mountpoint, opt.o_pool,
opt.o_max_jobs, opt.o_interval, opt.o_scan_rate,
opt.o_scan_threads, opt.o_slot_size);
- if (opt.o_iml_socket)
- fprintf(f, " iml_socket: %s\n", opt.o_iml_socket);
fprintf(f, "stats:\n"
" scans: %lu\n"