struct hsm_scan_request *request;
};
+struct hsm_thread_data {
+ struct mdt_thread_info *cdt_mti;
+ struct hsm_scan_request *request;
+};
/**
* llog_cat_process() callback, used to:
* - find waiting request and start action
static bool cdt_transition[CDT_STATES_COUNT][CDT_STATES_COUNT] = {
/* from -> to: stopped init running disable stopping */
/* stopped */ { true, true, false, false, false },
- /* init */ { true, false, true, false, true },
+ /* init */ { true, false, true, false, false },
/* running */ { false, false, true, true, true },
/* disable */ { false, false, true, true, true },
- /* stopping */ { true, false, false, false, true }
+ /* stopping */ { true, false, false, false, false }
};
/**
*/
static int mdt_coordinator(void *data)
{
- struct mdt_thread_info *mti = data;
+ struct hsm_thread_data *thread_data = data;
+ struct mdt_thread_info *mti = thread_data->cdt_mti;
struct mdt_device *mdt = mti->mti_mdt;
struct coordinator *cdt = &mdt->mdt_coordinator;
struct hsm_scan_data hsd = { NULL };
+ time64_t wait_event_time = 1 * HZ;
+ time64_t last_housekeeping = 0;
int rc = 0;
int request_sz;
ENTRY;
- CDEBUG(D_HSM, "%s: coordinator thread starting, pid=%d\n",
- mdt_obd_name(mdt), current_pid());
-
- /* we use a copy of cdt_max_requests in the cb, so if cdt_max_requests
- * increases due to a change from /proc we do not overflow the
- * hsd.request[] vector
- */
+ /* set up hsd->request and max_requests */
hsd.max_requests = cdt->cdt_max_requests;
request_sz = hsd.max_requests * sizeof(*hsd.request);
- OBD_ALLOC_LARGE(hsd.request, request_sz);
- if (!hsd.request)
- GOTO(out, rc = -ENOMEM);
+ hsd.request = thread_data->request;
+
+ CDEBUG(D_HSM, "%s: coordinator thread starting, pid=%d\n",
+ mdt_obd_name(mdt), current_pid());
hsd.mti = mti;
obd_uuid2fsname(hsd.fs_name, mdt_obd_name(mdt), MTI_NAME_MAXLEN);
wake_up_all(&cdt->cdt_waitq);
while (1) {
- struct l_wait_info lwi;
int i;
- lwi = LWI_TIMEOUT(cfs_time_seconds(cdt->cdt_loop_period),
- NULL, NULL);
- l_wait_event(cdt->cdt_waitq,
- cdt->cdt_event || (cdt->cdt_state == CDT_STOPPING),
- &lwi);
+ /* Limit execution of the expensive requests traversal
+ * to at most every "wait_event_time" jiffies. This prevents
+ * repeatedly locking/unlocking the catalog for each request
+ * and preventing other HSM operations from happening */
+ wait_event_interruptible_timeout(cdt->cdt_waitq,
+ kthread_should_stop(),
+ wait_event_time);
CDEBUG(D_HSM, "coordinator resumes\n");
- if (cdt->cdt_state == CDT_STOPPING) {
+ if (kthread_should_stop()) {
+ CDEBUG(D_HSM, "Coordinator stops\n");
rc = 0;
break;
}
- cdt->cdt_event = false;
-
/* if coordinator is suspended continue to wait */
if (cdt->cdt_state == CDT_DISABLE) {
CDEBUG(D_HSM, "disable state, coordinator sleeps\n");
continue;
}
+ /* If no event, and no housekeeping to do, continue to
+ * wait. */
+ if (last_housekeeping + cdt->cdt_loop_period <= get_seconds())
+ last_housekeeping = get_seconds();
+ else if (!cdt->cdt_event)
+ continue;
+
+ cdt->cdt_event = false;
+
CDEBUG(D_HSM, "coordinator starts reading llog\n");
if (hsd.max_requests != cdt->cdt_max_requests) {
/* cdt_max_requests has changed,
* we need to allocate a new buffer
*/
- OBD_FREE_LARGE(hsd.request, request_sz);
- hsd.max_requests = cdt->cdt_max_requests;
- request_sz = hsd.max_requests * sizeof(*hsd.request);
- OBD_ALLOC_LARGE(hsd.request, request_sz);
- if (!hsd.request) {
- rc = -ENOMEM;
- break;
+ struct hsm_scan_request *tmp = NULL;
+ int max_requests = cdt->cdt_max_requests;
+ OBD_ALLOC_LARGE(tmp, max_requests *
+ sizeof(struct hsm_scan_request));
+ if (!tmp) {
+ CERROR("Failed to resize request buffer, "
+ "keeping it at %d\n",
+ hsd.max_requests);
+ cdt->cdt_max_requests = hsd.max_requests;
+ } else {
+ OBD_FREE_LARGE(hsd.request, request_sz);
+ hsd.max_requests = max_requests;
+ request_sz = hsd.max_requests *
+ sizeof(struct hsm_scan_request);
+ hsd.request = tmp;
}
}
OBD_FREE(request->hal, request->hal_sz);
}
}
- EXIT;
-out:
- set_cdt_state(cdt, CDT_STOPPING, NULL);
if (hsd.request)
OBD_FREE_LARGE(hsd.request, request_sz);
mdt_hsm_cdt_cleanup(mdt);
- set_cdt_state(cdt, CDT_STOPPED, NULL);
- wake_up_all(&cdt->cdt_waitq);
-
if (rc != 0)
CERROR("%s: coordinator thread exiting, process=%d, rc=%d\n",
mdt_obd_name(mdt), current_pid(), rc);
" no error\n",
mdt_obd_name(mdt), current_pid());
- return rc;
+ RETURN(rc);
}
/**
hrd.hrd_mti = mti;
rc = cdt_llog_process(mti->mti_env, mti->mti_mdt, hsm_restore_cb, &hrd,
- 0, 0, READ);
+ 0, 0, WRITE);
RETURN(rc);
}
}
/**
- * wake up coordinator thread
- * \param mdt [IN] device
- * \retval 0 success
- * \retval -ve failure
- */
-int mdt_hsm_cdt_wakeup(struct mdt_device *mdt)
-{
- struct coordinator *cdt = &mdt->mdt_coordinator;
- ENTRY;
-
- if (cdt->cdt_state == CDT_STOPPED)
- RETURN(-ESRCH);
-
- /* wake up coordinator */
- cdt->cdt_event = true;
- wake_up_all(&cdt->cdt_waitq);
-
- RETURN(0);
-}
-
-/**
* initialize coordinator struct
* \param mdt [IN] device
* \retval 0 success
init_rwsem(&cdt->cdt_request_lock);
mutex_init(&cdt->cdt_restore_lock);
spin_lock_init(&cdt->cdt_state_lock);
- cdt->cdt_state = CDT_STOPPED;
+ set_cdt_state(cdt, CDT_STOPPED, NULL);
INIT_LIST_HEAD(&cdt->cdt_request_list);
INIT_LIST_HEAD(&cdt->cdt_agents);
cdt->cdt_policy = CDT_DEFAULT_POLICY;
cdt->cdt_active_req_timeout = 3600;
+ /* Initialize cdt_compound_id here to allow its usage for
+ * delayed requests from RAoLU policy */
+ atomic_set(&cdt->cdt_compound_id, cfs_time_current_sec());
+
+ /* by default do not remove archives on last unlink */
+ cdt->cdt_remove_archive_on_last_unlink = false;
+
RETURN(0);
out_env:
struct coordinator *cdt = &mdt->mdt_coordinator;
int rc;
void *ptr;
- struct mdt_thread_info *cdt_mti;
struct task_struct *task;
+ int request_sz;
+ struct hsm_thread_data thread_data;
ENTRY;
/* functions defined but not yet used
CLASSERT(1 << (CDT_POLICY_SHIFT_COUNT - 1) == CDT_POLICY_LAST);
cdt->cdt_policy = CDT_DEFAULT_POLICY;
- atomic_set(&cdt->cdt_compound_id, cfs_time_current_sec());
/* just need to be larger than previous one */
/* cdt_last_cookie is protected by cdt_llog_lock */
cdt->cdt_last_cookie = cfs_time_current_sec();
* /proc entries are created by the coordinator thread */
/* set up list of started restore requests */
- cdt_mti = lu_context_key_get(&cdt->cdt_env.le_ctx, &mdt_thread_key);
- rc = mdt_hsm_pending_restore(cdt_mti);
+ thread_data.cdt_mti =
+ lu_context_key_get(&cdt->cdt_env.le_ctx, &mdt_thread_key);
+ rc = mdt_hsm_pending_restore(thread_data.cdt_mti);
if (rc)
CERROR("%s: cannot take the layout locks needed"
" for registered restore: %d\n",
if (mdt->mdt_bottom->dd_rdonly)
RETURN(0);
- task = kthread_run(mdt_coordinator, cdt_mti, "hsm_cdtr");
+ /* Allocate the initial hsd.request[] vector*/
+ request_sz = cdt->cdt_max_requests * sizeof(struct hsm_scan_request);
+ OBD_ALLOC_LARGE(thread_data.request, request_sz);
+ if (!thread_data.request) {
+ set_cdt_state(cdt, CDT_STOPPED, NULL);
+ RETURN(-ENOMEM);
+ }
+
+ task = kthread_run(mdt_coordinator, &thread_data, "hsm_cdtr");
if (IS_ERR(task)) {
rc = PTR_ERR(task);
set_cdt_state(cdt, CDT_STOPPED, NULL);
+ OBD_FREE(thread_data.request, request_sz);
CERROR("%s: error starting coordinator thread: %d\n",
mdt_obd_name(mdt), rc);
} else {
+ cdt->cdt_task = task;
wait_event(cdt->cdt_waitq,
cdt->cdt_state != CDT_INIT);
- if (cdt->cdt_state == CDT_RUNNING) {
- CDEBUG(D_HSM, "%s: coordinator thread started\n",
- mdt_obd_name(mdt));
- rc = 0;
- } else {
- CDEBUG(D_HSM,
- "%s: coordinator thread failed to start\n",
- mdt_obd_name(mdt));
- rc = -EINVAL;
- }
+ CDEBUG(D_HSM, "%s: coordinator thread started\n",
+ mdt_obd_name(mdt));
+ rc = 0;
}
RETURN(rc);
ENTRY;
/* stop coordinator thread */
rc = set_cdt_state(cdt, CDT_STOPPING, NULL);
- if (rc != 0)
- RETURN(rc);
-
- wake_up_all(&cdt->cdt_waitq);
- wait_event(cdt->cdt_waitq, cdt->cdt_state != CDT_STOPPING);
+ if (rc == 0) {
+ kthread_stop(cdt->cdt_task);
+ cdt->cdt_task = NULL;
+ set_cdt_state(cdt, CDT_STOPPED, NULL);
+ }
RETURN(rc);
}
/* then remove request from memory list (LU-9075) */
mdt_cdt_remove_request(cdt, pgs->hpk_cookie);
- /* ct has completed a request, so a slot is available, wakeup
- * cdt to find new work */
- mdt_hsm_cdt_wakeup(mdt);
+ /* ct has completed a request, so a slot is available,
+ * signal the coordinator to find new work */
+ mdt_hsm_cdt_event(cdt);
} else {
/* if copytool send a progress on a canceled request
* we inform copytool it should stop
if (strcmp(kernbuf, CDT_ENABLE_CMD) == 0) {
if (cdt->cdt_state == CDT_DISABLE) {
rc = set_cdt_state(cdt, CDT_RUNNING, NULL);
- mdt_hsm_cdt_wakeup(mdt);
+ mdt_hsm_cdt_event(cdt);
+ wake_up(&cdt->cdt_waitq);
} else {
rc = mdt_hsm_cdt_start(mdt);
}
rc = -EALREADY;
} else {
rc = mdt_hsm_cdt_stop(mdt);
- mdt_hsm_cdt_wakeup(mdt);
}
} else if (strcmp(kernbuf, CDT_DISABLE_CMD) == 0) {
if ((cdt->cdt_state == CDT_STOPPING) ||
&cdt->cdt_other_request_mask);
}
+static int mdt_hsm_cdt_raolu_seq_show(struct seq_file *m, void *data)
+{
+ struct mdt_device *mdt = m->private;
+ struct coordinator *cdt = &mdt->mdt_coordinator;
+ ENTRY;
+
+ seq_printf(m, "%d\n", (int)cdt->cdt_remove_archive_on_last_unlink);
+ RETURN(0);
+}
+
+static ssize_t
+mdt_hsm_cdt_raolu_seq_write(struct file *file, const char __user *buffer,
+ size_t count, loff_t *off)
+
+{
+ struct seq_file *m = file->private_data;
+ struct mdt_device *mdt = m->private;
+ struct coordinator *cdt = &mdt->mdt_coordinator;
+ __s64 val;
+ int rc;
+ ENTRY;
+
+ rc = lprocfs_str_to_s64(buffer, count, &val);
+ if (rc < 0)
+ RETURN(rc);
+
+ cdt->cdt_remove_archive_on_last_unlink = val;
+ RETURN(count);
+}
+
LPROC_SEQ_FOPS(mdt_hsm_cdt_loop_period);
LPROC_SEQ_FOPS(mdt_hsm_cdt_grace_delay);
LPROC_SEQ_FOPS(mdt_hsm_cdt_active_req_timeout);
LPROC_SEQ_FOPS(mdt_hsm_user_request_mask);
LPROC_SEQ_FOPS(mdt_hsm_group_request_mask);
LPROC_SEQ_FOPS(mdt_hsm_other_request_mask);
+LPROC_SEQ_FOPS(mdt_hsm_cdt_raolu);
static struct lprocfs_vars lprocfs_mdt_hsm_vars[] = {
{ .name = "agents",
.fops = &mdt_hsm_group_request_mask_fops, },
{ .name = "other_request_mask",
.fops = &mdt_hsm_other_request_mask_fops, },
+ { .name = "remove_archive_on_last_unlink",
+ .fops = &mdt_hsm_cdt_raolu_fops, },
{ 0 }
};