int rag_index;
int rag_maxjobs;
int rag_jobs;
+ unsigned long rag_total_jobs;
bool rag_bad;
bool rag_is_local;
bool rag_opts_checked;
pthread_mutex_t rag_ssh_lock;
pthread_cond_t rag_ssh_cond;
char rag_extra_options[128];
+ unsigned long rag_s_ssh_err; /* SSH error counter */
+ unsigned long rag_s_inact_cnt; /* Agent disconnected counter */
};
struct resync_job;
__u64 rj_index;
pthread_t rj_pid;
int rj_resync;
- int rj_setprefer;
- int rj_check_job;
+ bool rj_setprefer;
+ bool rj_check_job;
int rj_stripes;
int rj_stripe_size;
struct pool_list *rj_pool;
unsigned long s_skip_by_source;
unsigned long s_extend_by_target;
unsigned long s_job_timedout;
+ unsigned long s_all_agents_inact; /* All agents inactive */
struct history s_hist[100];
int s_hist_cur;
};
lamigo_last_cleared, timestr);
#define AGENT_FMT \
- " agent%d: { host: %s, mnt: %s, jobs: %u, max: %u, state: %s }\n"
+ " agent%d: { host: %s, mnt: %s, jobs: %u, max: %u, state: %s, ssh_err: %lu/%lu, inact_cnt: %lu }\n"
fprintf(f, "agents:\n");
lipe_list_for_each_entry(a, &lamigo_agent_list, rag_list) {
fprintf(f, AGENT_FMT, a->rag_index, a->rag_hostname,
a->rag_mountpoint, a->rag_jobs, a->rag_maxjobs,
- a->rag_bad ? "inactive" : "active");
+ a->rag_bad ? "inactive" : "active",
+ a->rag_s_ssh_err, a->rag_total_jobs, a->rag_s_inact_cnt);
}
+ fprintf(f, "all_agents_inact: %lu\n", stats.s_all_agents_inact);
fprintf(f, "jobs:\n");
lamigo_dump_jobs(f, &lamigo_job_list);
- fprintf(f, "failed jobs:\n");
+ fprintf(f, "delayed jobs:\n");
lamigo_dump_jobs(f, &lamigo_failed_job_list);
fprintf(f, "history:\n");
return rc;
}
+static void check_bad_agent(struct resync_agent *agent, intptr_t retval)
+{
+ if (retval == SSH_ERROR) { /* couldn't connect */
+ LX_ERROR("SSH error on host '%s'\n", agent->rag_hostname);
+ agent->rag_s_ssh_err++;
+ }
+
+ if (agent->rag_bad)
+ return;
+
+ agent->rag_bad = true;
+ assert(lamigo_max_jobs >= agent->rag_maxjobs);
+ lamigo_max_jobs -= agent->rag_maxjobs;
+ agent->rag_opts_checked = false;
+ if (lamigo_max_jobs != 0) {
+ LX_WARN("agent '%s' is bad. New job limit: %lu\n",
+ agent->rag_hostname, lamigo_max_jobs);
+ } else {
+ LX_ERROR("agent '%s' is bad. All agents disconnected\n",
+ agent->rag_hostname);
+ stats.s_all_agents_inact++;
+ }
+ agent->rag_s_inact_cnt++;
+}
+
static void lamigo_agent_check_options(struct resync_agent *agent)
{
struct resync_ssh_session *rss;
}
if (rc != 0) {
- LX_ERROR("getting lfs options failed\n");
+ LX_ERROR("getting lfs options on '%s' failed\n",
+ agent->rag_hostname);
+ check_bad_agent(agent, SSH_ERROR);
+
goto out;
}
}
}
if (!a) {
- LX_DEBUG("no good agent\n");
+ /* This must not happen. We never spawn replication
+ * when no agents available
+ */
+ LX_ERROR("no good agent\n");
rc = -EBUSY;
goto out;
}
rj->rj_pid = pid;
a->rag_jobs++;
+ a->rag_total_jobs++;
stats.s_spawned++;
pthread_setname_np(pid, "lamigo-job");
rc = 0;
pthread_setname_np(pid, "lamigo-job-ca");
rj = xcalloc(1, sizeof(*rj));
- rj->rj_check_job = 1;
+ rj->rj_check_job = true;
rj->rj_pid = pid;
rj->rj_agent = a;
rj->rj_start = time(NULL);
rj->rj_pool = NULL;
rj->rj_mirror_opts = NULL;
a->rag_check_in_progress = true;
+ /* Add to job list, but do not count */
lipe_list_add_tail(&rj->rj_list, &lamigo_job_list);
}
}
srj = xcalloc(1, sizeof(*srj));
srj->rj_fid = rj->rj_fid;
- srj->rj_setprefer = 1;
+ srj->rj_setprefer = true;
/* XXX: few src pools? */
srj->rj_pool = fast_pools;
if (are_agents_busy()) {
/* all the agents are busy */
- LX_DEBUG("no agents avilable (max: %d)\n", lamigo_max_jobs);
+ LX_DEBUG("no agents available (max: %d)\n", lamigo_max_jobs);
return 1;
}
static void lamigo_job_fini(struct resync_job *rj, intptr_t retval)
{
+ struct resync_agent *agent = rj->rj_agent;
+
rj->rj_done_timestamp = time(NULL);
- rj->rj_try++;
if (rj->rj_check_job) {
- rj->rj_agent->rag_check_in_progress = false;
+ agent->rag_check_in_progress = false;
if (retval == 0) {
/* the agent is back */
- if (rj->rj_agent->rag_bad) {
- LX_DEBUG("agent %s is back\n",
- rj->rj_agent->rag_hostname);
- rj->rj_agent->rag_bad = false;
- lamigo_max_jobs += rj->rj_agent->rag_maxjobs;
+ if (agent->rag_bad) {
+ LX_INFO("agent '%s' is back\n",
+ agent->rag_hostname);
+ agent->rag_bad = false;
+ lamigo_max_jobs += agent->rag_maxjobs;
}
} else {
- /* the agent is still bad */
- if (rj->rj_agent->rag_bad == false) {
- LX_DEBUG("agent %s is bad\n",
- rj->rj_agent->rag_hostname);
-
- assert(lamigo_max_jobs >= rj->rj_agent->rag_maxjobs);
- lamigo_max_jobs -= rj->rj_agent->rag_maxjobs;
- rj->rj_agent->rag_bad = true;
- rj->rj_agent->rag_opts_checked = false;
- }
+ check_bad_agent(agent, retval);
}
free(rj);
return;
LX_DEBUG("job %lu on "DFID" done in %lu: %"PRIdPTR" (%d)\n",
rj->rj_pid, PFID(&rj->rj_fid), rj->rj_done_timestamp - rj->rj_start,
- retval, rj->rj_agent->rag_bad);
+ retval, agent->rag_bad);
if (rj->rj_callback)
rj->rj_callback(rj, rj->rj_callback_data, retval);
- assert(rj->rj_agent->rag_jobs > 0);
- rj->rj_agent->rag_jobs--;
- lamigo_jobs_running--;
-
if (retval == EBUSY) { /* Must not happen */
/*
* the file was busy, there will be another CLOSE
*/
stats.s_busy++;
} else if (retval == SSH_ERROR || retval == -ETIMEDOUT) {
- /* couldn't connect */
- /* XXX: resubmit the job */
- if (!rj->rj_agent->rag_bad) {
- rj->rj_agent->rag_bad = true;
- rj->rj_agent->rag_opts_checked = false;
- lamigo_max_jobs -= rj->rj_agent->rag_maxjobs;
+ check_bad_agent(agent, retval);
+ if (retval == SSH_ERROR) {
+ /* Resubmit the job */
+ rj->rj_pid = 0; /* This is used in stats dump */
+ rj->rj_agent = NULL;
+ rj->rj_try++;
+ lipe_list_add_tail(&rj->rj_list, &lamigo_failed_job_list);
+ lamigo_jobs_delayed++;
+ return;
}
- rj->rj_pid = 0;
- rj->rj_agent = NULL;
-
/*
* if we interrupted the job with ETIMEDOUT, then
* ssh was doing OK and the changelog got new
* OPEN/CLOSE events which will cause another try
*/
- if (retval == -ETIMEDOUT) {
- stats.s_job_timedout++;
- } else if (retval == SSH_ERROR) {
- lipe_list_add_tail(&rj->rj_list, &lamigo_failed_job_list);
- lamigo_jobs_delayed++;
- rj = NULL;
- }
-
+ stats.s_job_timedout++;
} else if (retval == 127) {
/* likely invalid setup on the agent (missing lfs?) */
/* XXX: resubmit the job */
stats.s_replicated++;
}
- if (rj)
- free(rj);
+ free(rj);
}
static void lamigo_check_jobs(void)
lipe_list_for_each_entry_safe(rj, tmp, &lamigo_job_list, rj_list) {
rc = pthread_tryjoin_np(rj->rj_pid, (void **)&retval);
- if (rc == EBUSY) {
+ if (rc == EBUSY)
continue;
- } else if (rc != 0) {
+
+ lipe_list_del(&rj->rj_list);
+ /* Check agent job is not counted */
+ if (!rj->rj_check_job) {
+ assert(lamigo_jobs_running > 0);
+ lamigo_jobs_running--;
+
+ assert(rj->rj_agent->rag_jobs > 0);
+ rj->rj_agent->rag_jobs--;
+ }
+
+ if (rc != 0) {
+ /* Should not happen. */
LX_ERROR("cannot join thread %lld: %s\n",
(long long)rj->rj_pid, strerror(rc));
+ free(rj);
} else {
- lipe_list_del(&rj->rj_list);
lamigo_job_fini(rj, retval);
}
}
if (strcmp(host, "-") == 0) {
a->rag_hostname = xstrdup("localhost");
- a->rag_is_local = 1;
+ a->rag_is_local = true;
} else {
a->rag_hostname = xstrdup(host);
}
rj->rj_fid = *fid;
rj->rj_stripes = stripes;
rj->rj_stripe_size = stripesz;
- rj->rj_index = 0;
rj->rj_resync = sync;
- rj->rj_check_job = 0;
- rj->rj_pid = 0;
- rj->rj_pool = pl;
+ rj->rj_pool = pl->pl_pool;
/* only fast pool mirrors should be marked "prefer" */
rj->rj_mirror_opts = pl->pl_is_fast ? "prefer" : NULL;
rj->rj_callback = lamigo_alr_mirror_cb;
rc = lamigo_submit_job(rj);
if (rc == 1) {
/* probably a dedicated list would be better */
+ rj->rj_try++;
lipe_list_add_tail(&rj->rj_list, &lamigo_failed_job_list);
lamigo_jobs_delayed++;
}