do { typeof(a) __swap_tmp = (a); (a) = (b); (b) = __swap_tmp; } while (0)
#endif
+#define USEC_PER_MSEC 1000
+#define USEC_PER_SEC 1000000
#define DEF_POOL_REFRESH_INTV (10 * 60)
#define DEF_PROGRESS_INTV (10 * 60)
#define DEF_MIN_AGE 600
static LIPE_LIST_HEAD(lamigo_agent_list);
static int lamigo_agent_count;
static int lamigo_max_jobs; /* max jobs for all agents */
-static int lamigo_jobs_running; /* jobs in lamigo_job_list */
-static int lamigo_jobs_delayed; /* jobs in lamigo_failed_job_list */
+static unsigned long lamigo_jobs_running; /* jobs running at the moment */
+static unsigned long lamigo_jobs_delayed; /* jobs in lamigo_failed_job_list */
static char fsname[MAX_OBD_NAME + 1];
static void *chglog_hdlr;
static void lamigo_alr_mirror_cb(struct resync_job *rj, void *cbdata, int rc);
static void lamigo_parse_rules(const char *rule_str, const char *filename);
static void lamigo_sync_hot_files(void);
+static void lamigo_show_progress(void);
struct pool_list *fast_pools; /* fast pools */
struct pool_list *slow_pools; /* slow pool */
{
struct resync_agent *a;
struct pool_list *pl;
- struct tm *tmtime;
+ struct tm tmtime;
char timestr[32];
+ time_t ltime;
FILE *f;
int i;
LX_ERROR("cannot open '%s': %s\n", opt.o_dump_file, strerror(errno));
return;
}
+
+ ltime = time(NULL);
+ localtime_r(<ime, &tmtime);
+ strftime(timestr, sizeof(timestr), "%c", &tmtime);
+
fprintf(f,
"version: %s-%s\n"
"revision: %s\n"
+ "time: %lu # ( %s )\n"
"config:\n"
" chlg_user: %s\n"
" mdtname: %s\n"
" mountpoint: %s\n"
" fast_pool: ",
PACKAGE_VERSION, LIPE_RELEASE, LIPE_REVISION,
+ ltime, timestr,
lamigo_changelog_user, opt.o_mdtname, opt.o_mntpt);
i = 0;
for (pl = fast_pools; pl != NULL; pl = pl->pl_next, i++)
stats.s_resync_stale, stats.s_skip_insync,
stats.s_skip_by_source, stats.s_extend_by_target);
- tmtime = localtime(&lamigo_last_cleared);
- strftime(timestr, sizeof(timestr), "%c", tmtime);
+ localtime_r(&lamigo_last_cleared, &tmtime);
+ strftime(timestr, sizeof(timestr), "%c", &tmtime);
fprintf(f, "changelog:\n"
" last_processed_idx: %llu\n"
{
int rc;
- /* Make the hash table size under 0.1% of the total cache size.
- * It is unlikely there are millions of FIDs in cache, stop at 1M.
+ /* Calculate number of hash buckets as
+ * number of unique cached FID's/4, but no more than 2M buckets
+ * (assuming that each FID takes 256B in cache memory)
*/
while (fid_hash_shift < 20 &&
- 8 << fid_hash_shift < opt.o_cache_size>>10)
+ 4 << fid_hash_shift < opt.o_cache_size >> 8)
fid_hash_shift++;
rc = fid_hash_init(&head.lh_hash);
if (rc)
- return rc;
+ LX_FATAL("OOM: Could not allocate FID hash (%d)",
+ fid_hash_shift);
return 0;
}
rj->rj_pid = pid;
a->rag_jobs++;
stats.s_spawned++;
+ pthread_setname_np(pid, "lamigo-job");
rc = 0;
out:
return rc;
(void *)a);
if (rc < 0)
return;
+
+ pthread_setname_np(pid, "lamigo-job-ca");
+
rj = xcalloc(1, sizeof(*rj));
rj->rj_check_job = 1;
rj->rj_pid = pid;
if (now - progress_timestamp < opt.o_progress_interval)
return;
progress_timestamp = now;
- if (stats.s_processed == progress_last_processed)
- return;
progress_last_processed = stats.s_processed;
+ lamigo_alr_stat_avg();
+
LX_INFO("%lu processed, %lu replicated, %lu busy, %lu running, "
"%lu delayed, %lu awaiting, %lu hot skipped, %lu ro2hot, "
- "%lu rw2hot, %lu rw2cold\n",
+ "%lu rw2hot, %lu rw2cold "
+
+ "%lu alr, %d alrm, %lu alrmmax, "
+ "%lu alrtavg, %lu alrtavgmax, %lu.%03lu alrt, %lu.%03lu alrtmax, "
+ "%lu hr, %lu hrmax, %lu srtmax, "
+ "%lu.%03lu heat_time_max, "
+ "%lu alravg, %lu alravgmax, "
+ "%lu alrheatexc\n",
+
stats.s_processed,
stats.s_replicated,
stats.s_busy,
stats.s_skip_hot,
stats.s_replicate_ro2hot,
stats.s_replicate_rw2hot,
- stats.s_replicate_rw2cold);
+ stats.s_replicate_rw2cold,
+
+ /* Total received ALR's */
+ alr_stats.s_alr_r,
+
+ /* Current allocated received ALR's */
+ (int)(alr_stats.s_alr_r-alr_stats.s_alr_m),
+
+ /* Max received allocated ALR's */
+ alr_stats.s_alr_m_max,
+
+ /* Current Avg receive time per 1000 ALR's in us */
+ alr_stats.s_alr_t_avg,
+
+ /* Max Avg receive time per 1000 ALR's in us */
+ alr_stats.s_alr_t_avg_max,
+
+ /* Current ALR receive time per set by heat thread */
+ alr_stats.s_alr_t / USEC_PER_SEC,
+ (alr_stats.s_alr_t/USEC_PER_MSEC) % 1000,
+
+ /* Max ALR receive time per set by heat thread */
+ alr_stats.s_alr_t_max / USEC_PER_SEC,
+ (alr_stats.s_alr_t_max/USEC_PER_MSEC) % 1000,
+
+ alr_stats.s_alr_hr, /* ALR hash size */
+ alr_stats.s_alr_hr_max, /* Max ALR hash size */
+ alr_stats.s_alr_srt_max, /* Max ALR's sorted */
+
+ /* Heat thread One cycle elapsed time */
+ alr_stats.s_alr_heat_time_max / USEC_PER_SEC,
+ (alr_stats.s_alr_heat_time_max/USEC_PER_MSEC) % 1000,
+ alr_stats.s_alr_avg, /* ALR received avg */
+ alr_stats.s_alr_avg_max, /* and avg max */
+ alr_stats.s_alr_heat_time_exc); /* Heat thread took longer */
+ /* o_alr_period_time */
}
static void lamigo_lock_pidfile(void)
if (rc != 0)
LX_FATAL("cannot start signal thread: %s\n", strerror(rc));
+ pthread_setname_np(lamigo_signal_thread_id, "lamigo-sig");
+
lipe_version_init();
ssh_threads_set_callbacks(ssh_threads_get_pthread());
ssh_init();
LX_INFO("version %s-%s, revision %s\n",
PACKAGE_VERSION, LIPE_RELEASE, LIPE_REVISION);
- rc = lamigo_init_cache();
- if (rc < 0)
- LX_FATAL("cannot init cache\n");
+ lamigo_init_cache();
/* create and lock pidfile to protect against another instance */
lamigo_lock_pidfile();
if (rc != 0)
LX_FATAL("cannot start statfs thread: %s\n", strerror(rc));
+ pthread_setname_np(lamigo_refresh_statfs_thread_id, "lamigo-statfs");
+
if (opt.o_rescan) {
/* scan the whole MDT and replicate matched files */
rc = lamigo_rescan();
#define KEEP_ALIVE_MSG "# keepalive"
#define SSH_POLL_TIMEOUT_SEC (opt.o_alr_ofd_interval * 8)
+#define ALR_STATS_INTERVAL (opt.o_progress_interval) /* interval to calculate average alr rate */
struct alr_agent {
char *ala_host;
struct lipe_list_head ala_list;
struct lipe_ssh_context ala_ctx;
- pthread_t ala_pid;
time_t ala_last_msg_received;
time_t ala_last_alr_restarted;
int keepalive_interval;
+ time_t ala_time_stat_start; /* When ALR_STATS_INTERVAL started */
+ unsigned long ala_total; /* Total records from this host */
+ unsigned long ala_cnt_start; /* Start counter for last ALR_STATS_INTERVAL */
+ unsigned long ala_avg; /* Last avg alr's per second */
+ unsigned long ala_avg_max; /* Max average alr's per second */
};
static LIPE_LIST_HEAD(alr_agent_list);
struct alr_period {
pthread_mutex_t alp_mutex;
struct lipe_list_head alp_list;
+ time_t alp_time; /* For statistics */
unsigned long alp_period;
int alp_nr;
unsigned long alp_max[2];
*/
static struct alr_head {
struct fid_hash_head alh_hash;
- unsigned long alh_count;
struct alr_period *alh_period;
int alh_lidx;
} alr_head;
static struct alr_rec_temp *art_queue = NULL;
static pthread_mutex_t art_mutex = PTHREAD_MUTEX_INITIALIZER;
static int mdtidx;
+struct alr_stats alr_stats;
-static void lamigo_alr_update_one(struct lu_fid *fid, enum alr_rw rw,
+static void lamigo_alr_update_one(struct alr_agent *ala,
+ struct lu_fid *fid, enum alr_rw rw,
unsigned long ops, unsigned long long iosize,
unsigned long long maxoff, enum alr_pool pool)
{
struct alr_rec_temp *t;
+ time_t interval;
t = xcalloc(sizeof(*t), 1);
t->art_fid = *fid;
pthread_mutex_lock(&art_mutex);
t->art_next = art_queue;
art_queue = t;
+ alr_stats.s_alr_r++;
pthread_mutex_unlock(&art_mutex);
+
+ ala->ala_total++; /* Counter per agent thread */
+ interval = ala->ala_last_msg_received - ala->ala_time_stat_start;
+
+ /* Calculate average rate */
+ if (interval > ALR_STATS_INTERVAL) {
+ unsigned long cnt = ala->ala_total - ala->ala_cnt_start;
+
+ ala->ala_cnt_start = ala->ala_total;
+ ala->ala_time_stat_start = ala->ala_last_msg_received;
+ ala->ala_avg = cnt / interval;
+
+ if (ala->ala_avg_max < ala->ala_avg)
+ ala->ala_avg_max = ala->ala_avg;
+ }
}
-static void lamigo_alr_parse_one(const char *host, const char *line)
+static void lamigo_alr_parse_one(struct alr_agent *ala, const char *line)
{
unsigned long long timestamp, begin, end, iosize, segments, count;
char *ostidx, *strend;
if (strncmp(line, KEEP_ALIVE_MSG, sizeof(KEEP_ALIVE_MSG)) == 0) {
/* Found "# keepalive" */
restart_log_reader_on_timeout = true;
- LX_DEBUG("keepalive msg from host:'%s'\n", host);
+ LX_DEBUG("keepalive msg from host:'%s'\n", ala->ala_host);
return;
}
pool = ALR_SLOW;
}
LX_DEBUG("ALRUPD: H: %s "DFID" RW: %c O: %llu IO: %llu P: %c\n",
- host, PFID(&fid), rw, count, iosize,
+ ala->ala_host, PFID(&fid), rw, count, iosize,
pool == ALR_FAST ? 'f' : (pool == ALR_SLOW ? 's' : 'n'));
- lamigo_alr_update_one(&fid, rw == 'w' ? ALR_WRITE : ALR_READ,
+ lamigo_alr_update_one(ala, &fid, rw == 'w' ? ALR_WRITE : ALR_READ,
count, iosize, end, pool);
}
-static int lamigo_alr_parse(const char *host, char *buf, int size, int *received)
+static int lamigo_alr_parse(struct alr_agent *ala, char *buf, int size, int *received)
{
int len, offset = 0;
char *p;
len = p - (buf + offset);
size -= len + 1;
*p = 0;
- lamigo_alr_parse_one(host, buf + offset);
+ lamigo_alr_parse_one(ala, buf + offset);
(*received)++;
offset += len + 1;
} while (size > 0);
}
break;
}
- offset = lamigo_alr_parse(ala->ala_host, buffer, offset + rc, &received);
+ offset = lamigo_alr_parse(ala, buffer, offset + rc, &received);
now = time(NULL);
ala->ala_last_msg_received = now;
if (now - last_checked > opt.o_progress_interval) {
p->alp_nr--;
fid_hash_del(&alr_head.alh_hash, &f->ar_fh);
free(f);
+ alr_stats.s_alr_hr--;
}
pthread_mutex_unlock(&p->alp_mutex);
}
sa[i].ase_ptr = f;
i++;
}
+
+ if (alr_stats.s_alr_srt_max < nr)
+ alr_stats.s_alr_srt_max = nr;
+
qsort(sa, nr, sizeof(*sa), alr_sort_compare);
/* refill the list according to sorting */
for (i = 0; i < nr; i++) {
p->alp_nr++;
f->ar_idx = idx;
pthread_mutex_unlock(&p->alp_mutex);
+ alr_stats.s_alr_hr++;
fid_hash_add(&alr_head.alh_hash, &f->ar_fh);
} else {
f->ar_pools[pool] += ops;
}
+#define USEC_PER_SEC 1000000
+#define NSEC_PER_SEC 1000000000
+/* Returns up time in us units */
+static unsigned long getus(void)
+{
+ struct timespec t;
+ int rc = clock_gettime(CLOCK_MONOTONIC, &t);
+
+ if (rc < 0)
+ return 0;
+
+ return t.tv_sec * USEC_PER_SEC + t.tv_nsec/(NSEC_PER_SEC/USEC_PER_SEC);
+}
+
static void lamigo_alr_process_temp(void)
{
struct alr_rec_temp *f, *n;
+ unsigned long start_time;
+ unsigned long avg_time, t;
+ unsigned long count;
/* take over all collected stats */
pthread_mutex_lock(&art_mutex);
f = art_queue;
art_queue = NULL;
+ count = alr_stats.s_alr_r - alr_stats.s_alr_m;
pthread_mutex_unlock(&art_mutex);
+ if (!f)
+ return;
+
+ start_time = getus();
+
+ if (count > alr_stats.s_alr_m_max)
+ alr_stats.s_alr_m_max = count;
+
/* process them and insert into the global heat table */
while (f) {
lamigo_alr_process_temp_one(&f->art_fid, f->art_rw, f->art_ops,
f->art_iosize, f->art_maxoff,
f->art_pool);
n = f->art_next;
+ alr_stats.s_alr_m++;
free(f);
f = n;
}
-}
+ t = getus() - start_time;
+
+ alr_stats.s_alr_t = t;
+ if (t > alr_stats.s_alr_t_max)
+ alr_stats.s_alr_t_max = t;
+
+ avg_time = t * 1000 / count; /* Avg time per 1000 records */
+
+ alr_stats.s_alr_t_avg = avg_time;
+
+ if (avg_time > alr_stats.s_alr_t_avg_max)
+ alr_stats.s_alr_t_avg_max = avg_time;
+ /* Update hash size stats */
+ if (alr_stats.s_alr_hr > alr_stats.s_alr_hr_max)
+ alr_stats.s_alr_hr_max = alr_stats.s_alr_hr;
+}
/* this thread maintains heat */
static void *lamigo_alr_heat_thread(void *arg)
{
- unsigned long last = time(NULL);
+ time_t last = time(NULL);
+ unsigned long heat_time;
+ unsigned long start_heat_time;
+
struct alr_period *p;
+ const int last_p_idx = opt.o_alr_periods-1;
while (1) {
/* check for new records every second */
sleep(1);
+
/* add all new records into the heat table */
lamigo_alr_process_temp();
continue;
last = time(NULL);
+ start_heat_time = getus();
+
lamigo_alr_update_heat_all();
lamigo_clean_oldest_list();
/* go to the next period */
alr_period++;
- if (++alr_head.alh_lidx == opt.o_alr_periods)
- alr_head.alh_lidx = 0;
+
+ alr_head.alh_lidx =
+ alr_head.alh_lidx == last_p_idx ?
+ 0 : alr_head.alh_lidx+1;
/* mark the period so we can find it by number later */
p = &alr_head.alh_period[alr_head.alh_lidx];
pthread_mutex_lock(&p->alp_mutex);
+ p->alp_time = time(NULL); /* for statistics */
p->alp_period = alr_period;
pthread_mutex_unlock(&p->alp_mutex);
+
+ heat_time = getus() - start_heat_time;
+ if (heat_time > alr_stats.s_alr_heat_time_max)
+ alr_stats.s_alr_heat_time_max = heat_time;
+
+ if (heat_time > opt.o_alr_period_time * USEC_PER_SEC)
+ alr_stats.s_alr_heat_time_exc++;
}
return NULL;
rc = fid_hash_init(&alr_head.alh_hash);
if (rc)
- LX_OOM(-1);
+ LX_FATAL("OOM: Could not allocate FID hash (%d)",
+ fid_hash_shift);
alr_head.alh_period = xcalloc(sizeof(*alr_head.alh_period),
opt.o_alr_periods);
return;
lipe_list_for_each_entry(ala, &alr_agent_list, ala_list) {
- rc = pthread_create(&ala->ala_pid, NULL,
+ char tname[16];
+
+ rc = pthread_create(&pid, NULL,
lamigo_alr_data_collection_thread, ala);
if (rc)
- LX_FATAL("cannot start access log reader: %s\n", strerror(rc));
+ LX_FATAL("cannot start access log reader: %s\n",
+ strerror(rc));
+
+ snprintf(tname, sizeof(tname), "lamigo-alr-%s", ala->ala_host);
+ pthread_setname_np(pid, tname);
}
rc = pthread_create(&pid, NULL, lamigo_alr_heat_thread, NULL);
if (rc)
LX_FATAL("cannot start heat-maint thread: %s\n", strerror(rc));
+ pthread_setname_np(pid, "lamigo-heat");
+
alr_initialized = true;
}
+void lamigo_alr_stat_avg(void)
+{
+ struct alr_agent *ala;
+ unsigned long avg_max = 0;
+ unsigned long avg = 0;
+
+ if (!alr_initialized)
+ return;
+
+ lipe_list_for_each_entry(ala, &alr_agent_list, ala_list) {
+ avg_max += ala->ala_avg_max;
+ avg += ala->ala_avg;
+ }
+
+ alr_stats.s_alr_avg = avg;
+ alr_stats.s_alr_avg_max = avg_max;
+}
+
void lamigo_alr_dump_stat(FILE *f)
{
struct alr_agent *ala;
+ int i;
if (!alr_initialized)
return;
fprintf(f, "access_log_readers:\n");
lipe_list_for_each_entry(ala, &alr_agent_list, ala_list) {
- fprintf(f, " - host:'%s'\n", ala->ala_host);
- fprintf(f, " last_message_at: %lu\n",
- ala->ala_last_msg_received);
+ fprintf(f, " - host:'%s'\n"
+ " last_message_at: %lu\n"
+ " total_alr: %lu\n"
+ " avg_rate: %lu\n"
+ " avg_rate_max: %lu\n",
+ ala->ala_host,
+ ala->ala_last_msg_received,
+ ala->ala_total,
+ ala->ala_avg,
+ ala->ala_avg_max);
}
}
ala = xcalloc(1, sizeof(*ala));
ala->ala_host = xstrdup(host);
+ ala->ala_time_stat_start = time(NULL); /* No accuracy required */
rc = lipe_ssh_context_init(&ala->ala_ctx, ala->ala_host);
assert(rc == SSH_OK);
struct alr_rec *f, *tmp;
struct alr_period *p;
int idx;
+ unsigned int total = 0;
+ fprintf(file, "lidx:%d alr_period:%lu\n", (int)alr_head.alh_lidx, alr_period);
for (idx = 0; idx < opt.o_alr_periods; idx++) {
+ unsigned int j = 0;
p = &alr_head.alh_period[idx];
pthread_mutex_lock(&p->alp_mutex);
+ fprintf(file, "***** %d) time:%lu period:%lu)\n", idx, (unsigned long)p->alp_time, p->alp_period);
lipe_list_for_each_entry_safe(f, tmp, &p->alp_list, ar_link) {
+ j++;
lamigo_alr_dump_one(file, f);
}
pthread_mutex_unlock(&p->alp_mutex);
+ fprintf(file, "%d) %d\n", idx, j);
+ total += j;
}
+ fprintf(file, "total=%lu\n", total);
}