* Copyright (c) 2011, 2012 Commissariat a l'energie atomique et aux energies
* alternatives
*
- * Copyright (c) 2013, 2016, Intel Corporation.
+ * Copyright (c) 2013, 2017, Intel Corporation.
* Use is subject to license terms.
*/
/*
};
struct hsm_scan_data {
- struct mdt_thread_info *mti;
- char fs_name[MTI_NAME_MAXLEN+1];
- /* request to be send to agents */
- int max_requests; /** vector size */
- int request_cnt; /** used count */
- struct hsm_scan_request *request;
+ struct mdt_thread_info *hsd_mti;
+ char hsd_fsname[MTI_NAME_MAXLEN + 1];
+ /* are we scanning the logs for housekeeping, or just looking
+ * for new work?
+ */
+ bool hsd_housekeeping;
+ int hsd_action_count;
+ int hsd_request_len; /* array alloc len */
+ int hsd_request_count; /* array used count */
+ struct hsm_scan_request *hsd_request;
};
-/**
- * llog_cat_process() callback, used to:
- * - find waiting request and start action
- * - purge canceled and done requests
- * \param env [IN] environment
- * \param llh [IN] llog handle
- * \param hdr [IN] llog record
- * \param data [IN/OUT] cb data = struct hsm_scan_data
- * \retval 0 success
- * \retval -ve failure
- */
-static int mdt_coordinator_cb(const struct lu_env *env,
+static int mdt_cdt_waiting_cb(const struct lu_env *env,
+ struct mdt_device *mdt,
struct llog_handle *llh,
- struct llog_rec_hdr *hdr,
- void *data)
+ struct llog_agent_req_rec *larr,
+ struct hsm_scan_data *hsd)
{
- struct llog_agent_req_rec *larr;
- struct hsm_scan_data *hsd;
- struct hsm_action_item *hai;
- struct mdt_device *mdt;
- struct coordinator *cdt;
- int rc;
- ENTRY;
-
- hsd = data;
- mdt = hsd->mti->mti_mdt;
- cdt = &mdt->mdt_coordinator;
+ struct coordinator *cdt = &mdt->mdt_coordinator;
+ struct hsm_scan_request *request;
+ struct hsm_action_item *hai;
+ int i;
- larr = (struct llog_agent_req_rec *)hdr;
- dump_llog_agent_req_rec("mdt_coordinator_cb(): ", larr);
- switch (larr->arr_status) {
- case ARS_WAITING: {
- int i;
- struct hsm_scan_request *request;
+ /* Are agents full? */
+ if (hsd->hsd_action_count + atomic_read(&cdt->cdt_request_count) >=
+ cdt->cdt_max_requests) {
+ if (hsd->hsd_housekeeping) {
+ /* Unknown request and no more room for a new
+ * request. Continue to scan to find other
+ * entries for already existing requests. */
+ RETURN(0);
+ } else {
+ /* We cannot send and more requests, stop
+ * here. There might be more known requests
+ * that could be merged, but this avoid
+ * analyzing too many llogs for minor
+ * gains. */
+ RETURN(LLOG_PROC_BREAK);
+ }
+ }
- /* Are agents full? */
- if (atomic_read(&cdt->cdt_request_count) >=
- cdt->cdt_max_requests)
+ /* first search whether the request is found in the list we
+ * have built. */
+ request = NULL;
+ for (i = 0; i < hsd->hsd_request_count; i++) {
+ if (hsd->hsd_request[i].hal->hal_compound_id ==
+ larr->arr_compound_id) {
+ request = &hsd->hsd_request[i];
break;
-
- /* first search whether the request is found in the
- * list we have built. */
- request = NULL;
- for (i = 0; i < hsd->request_cnt; i++) {
- if (hsd->request[i].hal->hal_compound_id ==
- larr->arr_compound_id) {
- request = &hsd->request[i];
- break;
- }
}
+ }
- if (!request) {
- struct hsm_action_list *hal;
+ if (!request) {
+ struct hsm_action_list *hal;
- if (hsd->request_cnt == hsd->max_requests)
- /* Unknown request and no more room
- * for a new request. Continue to scan
- * to find other entries for already
- * existing requests.
- */
+ if (hsd->hsd_request_count == hsd->hsd_request_len) {
+ /* Logic as above. */
+ if (hsd->hsd_housekeeping)
RETURN(0);
+ else
+ RETURN(LLOG_PROC_BREAK);
+ }
- request = &hsd->request[hsd->request_cnt];
+ request = &hsd->hsd_request[hsd->hsd_request_count];
- /* allocates hai vector size just needs to be large
- * enough */
- request->hal_sz =
- sizeof(*request->hal) +
- cfs_size_round(MTI_NAME_MAXLEN+1) +
- 2 * cfs_size_round(larr->arr_hai.hai_len);
- OBD_ALLOC(hal, request->hal_sz);
- if (!hal)
- RETURN(-ENOMEM);
- hal->hal_version = HAL_VERSION;
- strlcpy(hal->hal_fsname, hsd->fs_name,
- MTI_NAME_MAXLEN + 1);
- hal->hal_compound_id = larr->arr_compound_id;
- hal->hal_archive_id = larr->arr_archive_id;
- hal->hal_flags = larr->arr_flags;
- hal->hal_count = 0;
- request->hal_used_sz = hal_size(hal);
- request->hal = hal;
- hsd->request_cnt++;
- hai = hai_first(hal);
- } else {
- /* request is known */
- /* we check if record archive num is the same as the
- * known request, if not we will serve it in multiple
- * time because we do not know if the agent can serve
- * multiple backend
- * a use case is a compound made of multiple restore
- * where the files are not archived in the same backend
- */
- if (larr->arr_archive_id !=
- request->hal->hal_archive_id)
- RETURN(0);
+ /* allocates hai vector size just needs to be large
+ * enough */
+ request->hal_sz = sizeof(*request->hal) +
+ cfs_size_round(MTI_NAME_MAXLEN + 1) +
+ 2 * cfs_size_round(larr->arr_hai.hai_len);
+ OBD_ALLOC(hal, request->hal_sz);
+ if (!hal)
+ RETURN(-ENOMEM);
- if (request->hal_sz <
- request->hal_used_sz +
- cfs_size_round(larr->arr_hai.hai_len)) {
- /* Not enough room, need an extension */
- void *hal_buffer;
- int sz;
-
- sz = 2 * request->hal_sz;
- OBD_ALLOC(hal_buffer, sz);
- if (!hal_buffer)
- RETURN(-ENOMEM);
- memcpy(hal_buffer, request->hal,
- request->hal_used_sz);
- OBD_FREE(request->hal,
- request->hal_sz);
- request->hal = hal_buffer;
- request->hal_sz = sz;
- }
- hai = hai_first(request->hal);
- for (i = 0; i < request->hal->hal_count; i++)
- hai = hai_next(hai);
+ hal->hal_version = HAL_VERSION;
+ strlcpy(hal->hal_fsname, hsd->hsd_fsname, MTI_NAME_MAXLEN + 1);
+ hal->hal_compound_id = larr->arr_compound_id;
+ hal->hal_archive_id = larr->arr_archive_id;
+ hal->hal_flags = larr->arr_flags;
+ hal->hal_count = 0;
+ request->hal_used_sz = hal_size(hal);
+ request->hal = hal;
+ hsd->hsd_request_count++;
+ hai = hai_first(hal);
+ } else {
+ /* request is known */
+ /* we check if record archive num is the same as the
+ * known request, if not we will serve it in multiple
+ * time because we do not know if the agent can serve
+ * multiple backend a use case is a compound made of
+ * multiple restore where the files are not archived
+ * in the same backend */
+ if (larr->arr_archive_id != request->hal->hal_archive_id)
+ RETURN(0);
+
+ if (request->hal_sz < request->hal_used_sz +
+ cfs_size_round(larr->arr_hai.hai_len)) {
+ /* Not enough room, need an extension */
+ void *hal_buffer;
+ int sz;
+
+ sz = 2 * request->hal_sz;
+ OBD_ALLOC(hal_buffer, sz);
+ if (!hal_buffer)
+ RETURN(-ENOMEM);
+ memcpy(hal_buffer, request->hal, request->hal_used_sz);
+ OBD_FREE(request->hal, request->hal_sz);
+ request->hal = hal_buffer;
+ request->hal_sz = sz;
}
- memcpy(hai, &larr->arr_hai, larr->arr_hai.hai_len);
- hai->hai_cookie = larr->arr_hai.hai_cookie;
- hai->hai_gid = larr->arr_hai.hai_gid;
- request->hal_used_sz += cfs_size_round(hai->hai_len);
- request->hal->hal_count++;
+ hai = hai_first(request->hal);
+ for (i = 0; i < request->hal->hal_count; i++)
+ hai = hai_next(hai);
+ }
- if (hai->hai_action != HSMA_CANCEL)
- cdt_agent_record_hash_add(cdt, hai->hai_cookie,
- llh->lgh_hdr->llh_cat_idx,
- hdr->lrh_index);
- break;
+ memcpy(hai, &larr->arr_hai, larr->arr_hai.hai_len);
+ hai->hai_cookie = larr->arr_hai.hai_cookie;
+ hai->hai_gid = larr->arr_hai.hai_gid;
+
+ request->hal_used_sz += cfs_size_round(hai->hai_len);
+ request->hal->hal_count++;
+
+ hsd->hsd_action_count++;
+
+ if (hai->hai_action != HSMA_CANCEL)
+ cdt_agent_record_hash_add(cdt, hai->hai_cookie,
+ llh->lgh_hdr->llh_cat_idx,
+ larr->arr_hdr.lrh_index);
+
+ RETURN(0);
+}
+
+static int mdt_cdt_started_cb(const struct lu_env *env,
+ struct mdt_device *mdt,
+ struct llog_handle *llh,
+ struct llog_agent_req_rec *larr,
+ struct hsm_scan_data *hsd)
+{
+ struct coordinator *cdt = &mdt->mdt_coordinator;
+ struct hsm_action_item *hai = &larr->arr_hai;
+ struct cdt_agent_req *car;
+ time64_t now = ktime_get_real_seconds();
+ time64_t last;
+ int cl_flags;
+ int rc;
+
+ if (!hsd->hsd_housekeeping)
+ RETURN(0);
+
+ /* we search for a running request
+ * error may happen if coordinator crashes or stopped
+ * with running request
+ */
+ car = mdt_cdt_find_request(cdt, hai->hai_cookie);
+ if (car == NULL) {
+ last = larr->arr_req_change;
+ } else {
+ last = car->car_req_update;
}
- case ARS_STARTED: {
- struct hsm_progress_kernel pgs;
- struct cdt_agent_req *car;
- cfs_time_t now = cfs_time_current_sec();
- cfs_time_t last;
- /* we search for a running request
- * error may happen if coordinator crashes or stopped
- * with running request
- */
- car = mdt_cdt_find_request(cdt, larr->arr_hai.hai_cookie);
- if (car == NULL) {
- last = larr->arr_req_change;
- } else {
- last = car->car_req_update;
- mdt_cdt_put_request(car);
- }
+ /* test if request too long, if yes cancel it
+ * the same way the copy tool acknowledge a cancel request */
+ if (now <= last + cdt->cdt_active_req_timeout)
+ GOTO(out_car, rc = 0);
- /* test if request too long, if yes cancel it
- * the same way the copy tool acknowledge a cancel request */
- if (now <= last + cdt->cdt_active_req_timeout)
- RETURN(0);
+ dump_llog_agent_req_rec("request timed out, start cleaning", larr);
- dump_llog_agent_req_rec("request timed out, start cleaning",
- larr);
- /* a too old cancel request just needs to be removed
- * this can happen, if copy tool does not support
- * cancel for other requests, we have to remove the
- * running request and notify the copytool */
- pgs.hpk_fid = larr->arr_hai.hai_fid;
- pgs.hpk_cookie = larr->arr_hai.hai_cookie;
- pgs.hpk_extent = larr->arr_hai.hai_extent;
- pgs.hpk_flags = HP_FLAG_COMPLETED;
- pgs.hpk_errval = ENOSYS;
- pgs.hpk_data_version = 0;
-
- /* update request state, but do not record in llog, to
- * avoid deadlock on cdt_llog_lock */
- rc = mdt_hsm_update_request_state(hsd->mti, &pgs, 0);
- if (rc)
- CERROR("%s: cannot cleanup timed out request: "
- DFID" for cookie %#llx action=%s\n",
- mdt_obd_name(mdt),
- PFID(&pgs.hpk_fid), pgs.hpk_cookie,
- hsm_copytool_action2name(
- larr->arr_hai.hai_action));
-
- if (rc == -ENOENT) {
- /* The request no longer exists, forget
- * about it, and do not send a cancel request
- * to the client, for which an error will be
- * sent back, leading to an endless cycle of
- * cancellation. */
- cdt_agent_record_hash_del(cdt,
- larr->arr_hai.hai_cookie);
- RETURN(LLOG_DEL_RECORD);
- }
+ if (car != NULL) {
+ car->car_req_update = now;
+ mdt_hsm_agent_update_statistics(cdt, 0, 1, 0, &car->car_uuid);
+ /* Remove car from memory list (LU-9075) */
+ mdt_cdt_remove_request(cdt, hai->hai_cookie);
+ }
- /* XXX A cancel request cannot be cancelled. */
- if (larr->arr_hai.hai_action == HSMA_CANCEL)
- RETURN(0);
+ /* Emit a changelog record for the failed action.*/
+ cl_flags = 0;
+ hsm_set_cl_error(&cl_flags, ECANCELED);
- larr->arr_status = ARS_CANCELED;
- larr->arr_req_change = now;
- rc = llog_write(hsd->mti->mti_env, llh, hdr, hdr->lrh_index);
- if (rc < 0)
- CERROR("%s: cannot update agent log: rc = %d\n",
- mdt_obd_name(mdt), rc);
+ switch (hai->hai_action) {
+ case HSMA_ARCHIVE:
+ hsm_set_cl_event(&cl_flags, HE_ARCHIVE);
+ break;
+ case HSMA_RESTORE:
+ hsm_set_cl_event(&cl_flags, HE_RESTORE);
+ break;
+ case HSMA_REMOVE:
+ hsm_set_cl_event(&cl_flags, HE_REMOVE);
+ break;
+ case HSMA_CANCEL:
+ hsm_set_cl_event(&cl_flags, HE_CANCEL);
+ break;
+ default:
+ /* Unknown record type, skip changelog. */
+ cl_flags = 0;
break;
}
- case ARS_FAILED:
- case ARS_CANCELED:
- case ARS_SUCCEED:
+
+ if (cl_flags != 0)
+ mo_changelog(env, CL_HSM, cl_flags, mdt->mdt_child,
+ &hai->hai_fid);
+
+ if (hai->hai_action == HSMA_RESTORE)
+ cdt_restore_handle_del(hsd->hsd_mti, cdt, &hai->hai_fid);
+
+ larr->arr_status = ARS_CANCELED;
+ larr->arr_req_change = now;
+ rc = llog_write(hsd->hsd_mti->mti_env, llh, &larr->arr_hdr,
+ larr->arr_hdr.lrh_index);
+ if (rc < 0) {
+ CERROR("%s: cannot update agent log: rc = %d\n",
+ mdt_obd_name(mdt), rc);
+ rc = LLOG_DEL_RECORD;
+ }
+
+ /* ct has completed a request, so a slot is available,
+ * signal the coordinator to find new work */
+ mdt_hsm_cdt_event(cdt);
+out_car:
+ if (car != NULL)
+ mdt_cdt_put_request(car);
+
+ RETURN(rc);
+}
+
+/**
+ * llog_cat_process() callback, used to:
+ * - find waiting request and start action
+ * - purge canceled and done requests
+ * \param env [IN] environment
+ * \param llh [IN] llog handle
+ * \param hdr [IN] llog record
+ * \param data [IN/OUT] cb data = struct hsm_scan_data
+ * \retval 0 success
+ * \retval -ve failure
+ */
+static int mdt_coordinator_cb(const struct lu_env *env,
+ struct llog_handle *llh,
+ struct llog_rec_hdr *hdr,
+ void *data)
+{
+ struct llog_agent_req_rec *larr = (struct llog_agent_req_rec *)hdr;
+ struct hsm_scan_data *hsd = data;
+ struct mdt_device *mdt = hsd->hsd_mti->mti_mdt;
+ struct coordinator *cdt = &mdt->mdt_coordinator;
+ ENTRY;
+
+ larr = (struct llog_agent_req_rec *)hdr;
+ dump_llog_agent_req_rec("mdt_coordinator_cb(): ", larr);
+ switch (larr->arr_status) {
+ case ARS_WAITING:
+ RETURN(mdt_cdt_waiting_cb(env, mdt, llh, larr, hsd));
+ case ARS_STARTED:
+ RETURN(mdt_cdt_started_cb(env, mdt, llh, larr, hsd));
+ default:
+ if (!hsd->hsd_housekeeping)
+ RETURN(0);
+
if ((larr->arr_req_change + cdt->cdt_grace_delay) <
- cfs_time_current_sec()) {
+ ktime_get_real_seconds()) {
cdt_agent_record_hash_del(cdt,
larr->arr_hai.hai_cookie);
RETURN(LLOG_DEL_RECORD);
}
- break;
+
+ RETURN(0);
}
- RETURN(0);
}
/**
cdt_mti = lu_context_key_get(&cdt->cdt_env.le_ctx, &mdt_thread_key);
mutex_lock(&cdt->cdt_restore_lock);
- list_for_each_entry_safe(crh, tmp3, &cdt->cdt_restore_hdl, crh_list) {
+ list_for_each_entry_safe(crh, tmp3, &cdt->cdt_restore_handle_list,
+ crh_list) {
list_del(&crh->crh_list);
/* give back layout lock */
mdt_object_unlock(cdt_mti, NULL, &crh->crh_lh, 1);
static bool cdt_transition[CDT_STATES_COUNT][CDT_STATES_COUNT] = {
/* from -> to: stopped init running disable stopping */
/* stopped */ { true, true, false, false, false },
- /* init */ { true, false, true, false, true },
+ /* init */ { true, false, true, false, false },
/* running */ { false, false, true, true, true },
/* disable */ { false, false, true, true, true },
- /* stopping */ { true, false, false, false, true }
+ /* stopping */ { true, false, false, false, false }
};
/**
* Returns 0 on success, with old_state set if not NULL, or -EINVAL if
* the transition was not possible.
*/
-static int set_cdt_state(struct coordinator *cdt, enum cdt_states new_state,
- enum cdt_states *old_state)
+static int set_cdt_state_locked(struct coordinator *cdt,
+ enum cdt_states new_state)
{
int rc;
enum cdt_states state;
- spin_lock(&cdt->cdt_state_lock);
-
state = cdt->cdt_state;
if (cdt_transition[state][new_state]) {
cdt->cdt_state = new_state;
- spin_unlock(&cdt->cdt_state_lock);
- if (old_state)
- *old_state = state;
rc = 0;
} else {
- spin_unlock(&cdt->cdt_state_lock);
CDEBUG(D_HSM,
"unexpected coordinator transition, from=%s, to=%s\n",
cdt_mdt_state2str(state), cdt_mdt_state2str(new_state));
return rc;
}
+static int set_cdt_state(struct coordinator *cdt, enum cdt_states new_state)
+{
+ int rc;
+
+ mutex_lock(&cdt->cdt_state_lock);
+ rc = set_cdt_state_locked(cdt, new_state);
+ mutex_unlock(&cdt->cdt_state_lock);
+
+ return rc;
+}
+
+
+
/**
* coordinator thread
* \param data [IN] obd device
struct mdt_device *mdt = mti->mti_mdt;
struct coordinator *cdt = &mdt->mdt_coordinator;
struct hsm_scan_data hsd = { NULL };
- int rc = 0;
- int request_sz;
+ time64_t last_housekeeping = 0;
+ size_t request_sz = 0;
+ int rc;
ENTRY;
CDEBUG(D_HSM, "%s: coordinator thread starting, pid=%d\n",
mdt_obd_name(mdt), current_pid());
- /* we use a copy of cdt_max_requests in the cb, so if cdt_max_requests
- * increases due to a change from /proc we do not overflow the
- * hsd.request[] vector
- */
- hsd.max_requests = cdt->cdt_max_requests;
- request_sz = hsd.max_requests * sizeof(*hsd.request);
- OBD_ALLOC_LARGE(hsd.request, request_sz);
- if (!hsd.request)
- GOTO(out, rc = -ENOMEM);
-
- hsd.mti = mti;
- obd_uuid2fsname(hsd.fs_name, mdt_obd_name(mdt), MTI_NAME_MAXLEN);
+ hsd.hsd_mti = mti;
+ obd_uuid2fsname(hsd.hsd_fsname, mdt_obd_name(mdt),
+ sizeof(hsd.hsd_fsname));
- set_cdt_state(cdt, CDT_RUNNING, NULL);
+ set_cdt_state(cdt, CDT_RUNNING);
/* Inform mdt_hsm_cdt_start(). */
wake_up_all(&cdt->cdt_waitq);
while (1) {
- struct l_wait_info lwi;
int i;
+ int update_idx = 0;
+ int updates_sz;
+ int updates_cnt;
+ struct hsm_record_update *updates;
+
+ /* Limit execution of the expensive requests traversal
+ * to at most one second. This prevents repeatedly
+ * locking/unlocking the catalog for each request
+ * and preventing other HSM operations from happening
+ */
+ wait_event_interruptible_timeout(cdt->cdt_waitq,
+ kthread_should_stop() ||
+ cdt->cdt_wakeup_coordinator,
+ cfs_time_seconds(1));
- lwi = LWI_TIMEOUT(cfs_time_seconds(cdt->cdt_loop_period),
- NULL, NULL);
- l_wait_event(cdt->cdt_waitq,
- cdt->cdt_event || (cdt->cdt_state == CDT_STOPPING),
- &lwi);
-
+ cdt->cdt_wakeup_coordinator = false;
CDEBUG(D_HSM, "coordinator resumes\n");
- if (cdt->cdt_state == CDT_STOPPING) {
+ if (kthread_should_stop()) {
+ CDEBUG(D_HSM, "Coordinator stops\n");
rc = 0;
break;
}
- cdt->cdt_event = false;
-
/* if coordinator is suspended continue to wait */
if (cdt->cdt_state == CDT_DISABLE) {
CDEBUG(D_HSM, "disable state, coordinator sleeps\n");
continue;
}
+ /* If no event, and no housekeeping to do, continue to
+ * wait. */
+ if (last_housekeeping + cdt->cdt_loop_period <=
+ ktime_get_real_seconds()) {
+ last_housekeeping = ktime_get_real_seconds();
+ hsd.hsd_housekeeping = true;
+ } else if (cdt->cdt_event) {
+ hsd.hsd_housekeeping = false;
+ } else {
+ continue;
+ }
+
+ cdt->cdt_event = false;
+
CDEBUG(D_HSM, "coordinator starts reading llog\n");
- if (hsd.max_requests != cdt->cdt_max_requests) {
+ if (hsd.hsd_request_len != cdt->cdt_max_requests) {
/* cdt_max_requests has changed,
* we need to allocate a new buffer
*/
- OBD_FREE_LARGE(hsd.request, request_sz);
- hsd.max_requests = cdt->cdt_max_requests;
- request_sz = hsd.max_requests * sizeof(*hsd.request);
- OBD_ALLOC_LARGE(hsd.request, request_sz);
- if (!hsd.request) {
- rc = -ENOMEM;
- break;
+ struct hsm_scan_request *tmp = NULL;
+ int max_requests = cdt->cdt_max_requests;
+ OBD_ALLOC_LARGE(tmp, max_requests *
+ sizeof(struct hsm_scan_request));
+ if (!tmp) {
+ CERROR("Failed to resize request buffer, "
+ "keeping it at %d\n",
+ hsd.hsd_request_len);
+ } else {
+ if (hsd.hsd_request != NULL)
+ OBD_FREE_LARGE(hsd.hsd_request,
+ request_sz);
+
+ hsd.hsd_request_len = max_requests;
+ request_sz = hsd.hsd_request_len *
+ sizeof(struct hsm_scan_request);
+ hsd.hsd_request = tmp;
}
}
- hsd.request_cnt = 0;
+ hsd.hsd_action_count = 0;
+ hsd.hsd_request_count = 0;
rc = cdt_llog_process(mti->mti_env, mdt, mdt_coordinator_cb,
&hsd, 0, 0, WRITE);
if (rc < 0)
goto clean_cb_alloc;
- CDEBUG(D_HSM, "found %d requests to send\n", hsd.request_cnt);
+ CDEBUG(D_HSM, "found %d requests to send\n",
+ hsd.hsd_request_count);
if (list_empty(&cdt->cdt_agents)) {
CDEBUG(D_HSM, "no agent available, "
goto clean_cb_alloc;
}
+ /* Compute how many HAI we have in all the requests */
+ updates_cnt = 0;
+ for (i = 0; i < hsd.hsd_request_count; i++) {
+ const struct hsm_scan_request *request =
+ &hsd.hsd_request[i];
+
+ updates_cnt += request->hal->hal_count;
+ }
+
+ /* Allocate a temporary array to store the cookies to
+ * update, and their status. */
+ updates_sz = updates_cnt * sizeof(*updates);
+ OBD_ALLOC(updates, updates_sz);
+ if (updates == NULL) {
+ CERROR("%s: Cannot allocate memory (%d o) "
+ "for %d updates\n",
+ mdt_obd_name(mdt), updates_sz, updates_cnt);
+ continue;
+ }
+
/* here hsd contains a list of requests to be started */
- for (i = 0; i < hsd.request_cnt; i++) {
- struct hsm_scan_request *request = &hsd.request[i];
+ for (i = 0; i < hsd.hsd_request_count; i++) {
+ struct hsm_scan_request *request = &hsd.hsd_request[i];
struct hsm_action_list *hal = request->hal;
struct hsm_action_item *hai;
- __u64 *cookies;
- int sz, j;
- enum agent_req_status status;
+ int j;
/* still room for work ? */
if (atomic_read(&cdt->cdt_request_count) >=
* if the copy tool failed to do the request
* it has to use hsm_progress
*/
- status = (rc ? ARS_WAITING : ARS_STARTED);
/* set up cookie vector to set records status
* after copy tools start or failed
*/
- sz = hal->hal_count * sizeof(__u64);
- OBD_ALLOC(cookies, sz);
- if (cookies == NULL)
- continue;
-
hai = hai_first(hal);
for (j = 0; j < hal->hal_count; j++) {
- cookies[j] = hai->hai_cookie;
+ updates[update_idx].cookie = hai->hai_cookie;
+ updates[update_idx].status =
+ (rc ? ARS_WAITING : ARS_STARTED);
hai = hai_next(hai);
+ update_idx++;
}
+ }
- rc = mdt_agent_record_update(mti->mti_env, mdt, cookies,
- hal->hal_count, status);
+ if (update_idx) {
+ rc = mdt_agent_record_update(mti->mti_env, mdt,
+ updates, update_idx);
if (rc)
CERROR("%s: mdt_agent_record_update() failed, "
- "rc=%d, cannot update status to %s "
+ "rc=%d, cannot update records "
"for %d cookies\n",
- mdt_obd_name(mdt), rc,
- agent_req_status2name(status),
- hal->hal_count);
-
- OBD_FREE(cookies, sz);
+ mdt_obd_name(mdt), rc, update_idx);
}
+
+ OBD_FREE(updates, updates_sz);
+
clean_cb_alloc:
/* free hal allocated by callback */
- for (i = 0; i < hsd.request_cnt; i++) {
- struct hsm_scan_request *request = &hsd.request[i];
+ for (i = 0; i < hsd.hsd_request_count; i++) {
+ struct hsm_scan_request *request = &hsd.hsd_request[i];
OBD_FREE(request->hal, request->hal_sz);
}
}
- EXIT;
-out:
- set_cdt_state(cdt, CDT_STOPPING, NULL);
- if (hsd.request)
- OBD_FREE_LARGE(hsd.request, request_sz);
+ if (hsd.hsd_request != NULL)
+ OBD_FREE_LARGE(hsd.hsd_request, request_sz);
mdt_hsm_cdt_cleanup(mdt);
- set_cdt_state(cdt, CDT_STOPPED, NULL);
- wake_up_all(&cdt->cdt_waitq);
-
if (rc != 0)
CERROR("%s: coordinator thread exiting, process=%d, rc=%d\n",
mdt_obd_name(mdt), current_pid(), rc);
" no error\n",
mdt_obd_name(mdt), current_pid());
+ RETURN(rc);
+}
+
+int cdt_restore_handle_add(struct mdt_thread_info *mti, struct coordinator *cdt,
+ const struct lu_fid *fid,
+ const struct hsm_extent *he)
+{
+ struct cdt_restore_handle *crh;
+ struct mdt_object *obj;
+ int rc;
+ ENTRY;
+
+ OBD_SLAB_ALLOC_PTR(crh, mdt_hsm_cdt_kmem);
+ if (crh == NULL)
+ RETURN(-ENOMEM);
+
+ crh->crh_fid = *fid;
+ /* in V1 all file is restored
+ * crh->extent.start = he->offset;
+ * crh->extent.end = he->offset + he->length;
+ */
+ crh->crh_extent.start = 0;
+ crh->crh_extent.end = he->length;
+ /* get the layout lock */
+ mdt_lock_reg_init(&crh->crh_lh, LCK_EX);
+ obj = mdt_object_find_lock(mti, &crh->crh_fid, &crh->crh_lh,
+ MDS_INODELOCK_LAYOUT);
+ if (IS_ERR(obj))
+ GOTO(out_crh, rc = PTR_ERR(obj));
+
+ /* We do not keep a reference on the object during the restore
+ * which can be very long. */
+ mdt_object_put(mti->mti_env, obj);
+
+ mutex_lock(&cdt->cdt_restore_lock);
+ if (unlikely(cdt->cdt_state == CDT_STOPPED ||
+ cdt->cdt_state == CDT_STOPPING)) {
+ mutex_unlock(&cdt->cdt_restore_lock);
+ GOTO(out_lh, rc = -EAGAIN);
+ }
+
+ list_add_tail(&crh->crh_list, &cdt->cdt_restore_handle_list);
+ mutex_unlock(&cdt->cdt_restore_lock);
+
+ RETURN(0);
+out_lh:
+ mdt_object_unlock(mti, NULL, &crh->crh_lh, 1);
+out_crh:
+ OBD_SLAB_FREE_PTR(crh, mdt_hsm_cdt_kmem);
+
return rc;
}
* \retval cdt_restore_handle found
* \retval NULL not found
*/
-struct cdt_restore_handle *mdt_hsm_restore_hdl_find(struct coordinator *cdt,
- const struct lu_fid *fid)
+struct cdt_restore_handle *cdt_restore_handle_find(struct coordinator *cdt,
+ const struct lu_fid *fid)
{
- struct cdt_restore_handle *crh;
+ struct cdt_restore_handle *crh;
ENTRY;
- list_for_each_entry(crh, &cdt->cdt_restore_hdl, crh_list) {
+ list_for_each_entry(crh, &cdt->cdt_restore_handle_list, crh_list) {
if (lu_fid_eq(&crh->crh_fid, fid))
RETURN(crh);
}
+
RETURN(NULL);
}
+void cdt_restore_handle_del(struct mdt_thread_info *mti,
+ struct coordinator *cdt, const struct lu_fid *fid)
+{
+ struct cdt_restore_handle *crh;
+
+ /* give back layout lock */
+ mutex_lock(&cdt->cdt_restore_lock);
+ crh = cdt_restore_handle_find(cdt, fid);
+ if (crh != NULL)
+ list_del(&crh->crh_list);
+ mutex_unlock(&cdt->cdt_restore_lock);
+
+ if (crh == NULL)
+ return;
+
+ /* XXX We pass a NULL object since the restore handle does not
+ * keep a reference on the object being restored. */
+ mdt_object_unlock(mti, NULL, &crh->crh_lh, 1);
+ OBD_SLAB_FREE_PTR(crh, mdt_hsm_cdt_kmem);
+}
+
/**
* data passed to llog_cat_process() callback
* to scan requests and take actions
{
struct llog_agent_req_rec *larr;
struct hsm_restore_data *hrd;
- struct cdt_restore_handle *crh;
struct hsm_action_item *hai;
struct mdt_thread_info *mti;
struct coordinator *cdt;
- struct mdt_object *child;
int rc;
ENTRY;
* when being re-started */
if (larr->arr_status == ARS_STARTED) {
larr->arr_status = ARS_WAITING;
- larr->arr_req_change = cfs_time_current_sec();
+ larr->arr_req_change = ktime_get_real_seconds();
rc = llog_write(env, llh, hdr, hdr->lrh_index);
if (rc != 0)
GOTO(out, rc);
}
- OBD_SLAB_ALLOC_PTR(crh, mdt_hsm_cdt_kmem);
- if (crh == NULL)
- RETURN(-ENOMEM);
-
- crh->crh_fid = hai->hai_fid;
- /* in V1 all file is restored
- crh->extent.start = hai->hai_extent.offset;
- crh->extent.end = hai->hai_extent.offset + hai->hai_extent.length;
- */
- crh->crh_extent.start = 0;
- crh->crh_extent.end = hai->hai_extent.length;
- /* get the layout lock */
- mdt_lock_reg_init(&crh->crh_lh, LCK_EX);
- child = mdt_object_find_lock(mti, &crh->crh_fid, &crh->crh_lh,
- MDS_INODELOCK_LAYOUT);
- if (IS_ERR(child))
- GOTO(out, rc = PTR_ERR(child));
-
- rc = 0;
- /* we choose to not keep a reference
- * on the object during the restore time which can be very long */
- mdt_object_put(mti->mti_env, child);
-
- mutex_lock(&cdt->cdt_restore_lock);
- list_add_tail(&crh->crh_list, &cdt->cdt_restore_hdl);
- mutex_unlock(&cdt->cdt_restore_lock);
-
+ rc = cdt_restore_handle_add(mti, cdt, &hai->hai_fid, &hai->hai_extent);
out:
RETURN(rc);
}
uc->uc_umask = 0777;
uc->uc_ginfo = NULL;
uc->uc_identity = NULL;
-
- RETURN(0);
-}
-
-/**
- * wake up coordinator thread
- * \param mdt [IN] device
- * \retval 0 success
- * \retval -ve failure
- */
-int mdt_hsm_cdt_wakeup(struct mdt_device *mdt)
-{
- struct coordinator *cdt = &mdt->mdt_coordinator;
- ENTRY;
-
- if (cdt->cdt_state == CDT_STOPPED)
- RETURN(-ESRCH);
-
- /* wake up coordinator */
- cdt->cdt_event = true;
- wake_up_all(&cdt->cdt_waitq);
+ /* always record internal HSM activity if also enabled globally */
+ uc->uc_enable_audit = 1;
RETURN(0);
}
init_rwsem(&cdt->cdt_agent_lock);
init_rwsem(&cdt->cdt_request_lock);
mutex_init(&cdt->cdt_restore_lock);
- spin_lock_init(&cdt->cdt_state_lock);
- cdt->cdt_state = CDT_STOPPED;
+ mutex_init(&cdt->cdt_state_lock);
+ set_cdt_state(cdt, CDT_STOPPED);
INIT_LIST_HEAD(&cdt->cdt_request_list);
INIT_LIST_HEAD(&cdt->cdt_agents);
- INIT_LIST_HEAD(&cdt->cdt_restore_hdl);
+ INIT_LIST_HEAD(&cdt->cdt_restore_handle_list);
cdt->cdt_request_cookie_hash = cfs_hash_create("REQUEST_COOKIE_HASH",
CFS_HASH_BITS_MIN,
/* Initialize cdt_compound_id here to allow its usage for
* delayed requests from RAoLU policy */
- atomic_set(&cdt->cdt_compound_id, cfs_time_current_sec());
+ atomic_set(&cdt->cdt_compound_id, ktime_get_real_seconds());
/* by default do not remove archives on last unlink */
cdt->cdt_remove_archive_on_last_unlink = false;
static int mdt_hsm_cdt_start(struct mdt_device *mdt)
{
struct coordinator *cdt = &mdt->mdt_coordinator;
+ struct mdt_thread_info *cdt_mti;
int rc;
void *ptr;
- struct mdt_thread_info *cdt_mti;
struct task_struct *task;
ENTRY;
*/
ptr = dump_requests;
- rc = set_cdt_state(cdt, CDT_INIT, NULL);
+ rc = set_cdt_state(cdt, CDT_INIT);
if (rc) {
CERROR("%s: Coordinator already started or stopping\n",
mdt_obd_name(mdt));
/* just need to be larger than previous one */
/* cdt_last_cookie is protected by cdt_llog_lock */
- cdt->cdt_last_cookie = cfs_time_current_sec();
+ cdt->cdt_last_cookie = ktime_get_real_seconds();
atomic_set(&cdt->cdt_request_count, 0);
+ atomic_set(&cdt->cdt_archive_count, 0);
+ atomic_set(&cdt->cdt_restore_count, 0);
+ atomic_set(&cdt->cdt_remove_count, 0);
cdt->cdt_user_request_mask = (1UL << HSMA_RESTORE);
cdt->cdt_group_request_mask = (1UL << HSMA_RESTORE);
cdt->cdt_other_request_mask = (1UL << HSMA_RESTORE);
task = kthread_run(mdt_coordinator, cdt_mti, "hsm_cdtr");
if (IS_ERR(task)) {
rc = PTR_ERR(task);
- set_cdt_state(cdt, CDT_STOPPED, NULL);
+ set_cdt_state(cdt, CDT_STOPPED);
CERROR("%s: error starting coordinator thread: %d\n",
mdt_obd_name(mdt), rc);
} else {
+ cdt->cdt_task = task;
wait_event(cdt->cdt_waitq,
cdt->cdt_state != CDT_INIT);
- if (cdt->cdt_state == CDT_RUNNING) {
- CDEBUG(D_HSM, "%s: coordinator thread started\n",
- mdt_obd_name(mdt));
- rc = 0;
- } else {
- CDEBUG(D_HSM,
- "%s: coordinator thread failed to start\n",
- mdt_obd_name(mdt));
- rc = -EINVAL;
- }
+ CDEBUG(D_HSM, "%s: coordinator thread started\n",
+ mdt_obd_name(mdt));
+ rc = 0;
}
RETURN(rc);
ENTRY;
/* stop coordinator thread */
- rc = set_cdt_state(cdt, CDT_STOPPING, NULL);
- if (rc != 0)
- RETURN(rc);
-
- wake_up_all(&cdt->cdt_waitq);
- wait_event(cdt->cdt_waitq, cdt->cdt_state != CDT_STOPPING);
+ rc = set_cdt_state(cdt, CDT_STOPPING);
+ if (rc == 0) {
+ kthread_stop(cdt->cdt_task);
+ cdt->cdt_task = NULL;
+ set_cdt_state(cdt, CDT_STOPPED);
+ }
RETURN(rc);
}
* it will be done when updating the request status
*/
if (hai->hai_action == HSMA_CANCEL) {
+ struct hsm_record_update update = {
+ .cookie = hai->hai_cookie,
+ .status = ARS_CANCELED,
+ };
+
rc = mdt_agent_record_update(mti->mti_env, mti->mti_mdt,
- &hai->hai_cookie,
- 1, ARS_CANCELED);
+ &update, 1);
if (rc) {
CERROR("%s: mdt_agent_record_update() failed, "
"rc=%d, cannot update status to %s "
* update status of a completed request
* \param mti [IN] context
* \param pgs [IN] progress of the copy tool
- * \param update_record [IN] update llog record
* \retval 0 success
* \retval -ve failure
*/
/* rc != 0 means error when analysing action, it may come from
* a crasy CT no need to manage DIRTY
+ * and if mdt_hsm_get_md_hsm() has returned an error, mh has not been
+ * filled
*/
- if (rc == 0)
+ if (rc == 0 && !IS_ERR(obj))
hsm_set_cl_flags(&cl_flags,
mh.mh_flags & HS_DIRTY ? CLF_HSM_DIRTY : 0);
* if no retry will be attempted and if object is still alive,
* in other cases we just unlock the object */
if (car->car_hai->hai_action == HSMA_RESTORE) {
- struct cdt_restore_handle *crh;
-
/* restore in data FID done, we swap the layouts
* only if restore is successful */
if (pgs->hpk_errval == 0 && !IS_ERR(obj)) {
&car->car_hai->hai_fid);
need_changelog = false;
- /* give back layout lock */
- mutex_lock(&cdt->cdt_restore_lock);
- crh = mdt_hsm_restore_hdl_find(cdt, &car->car_hai->hai_fid);
- if (crh != NULL)
- list_del(&crh->crh_list);
- mutex_unlock(&cdt->cdt_restore_lock);
- /* Just give back layout lock, we keep the reference
- * which is given back later with the lock for HSM
- * flags.
- * XXX obj may be invalid so we do not pass it. */
- if (crh != NULL)
- mdt_object_unlock(mti, NULL, &crh->crh_lh, 1);
-
- if (crh != NULL)
- OBD_SLAB_FREE_PTR(crh, mdt_hsm_cdt_kmem);
+ cdt_restore_handle_del(mti, cdt, &car->car_hai->hai_fid);
}
GOTO(out, rc);
* update status of a request
* \param mti [IN] context
* \param pgs [IN] progress of the copy tool
- * \param update_record [IN] update llog record
* \retval 0 success
* \retval -ve failure
*/
int mdt_hsm_update_request_state(struct mdt_thread_info *mti,
- struct hsm_progress_kernel *pgs,
- const int update_record)
+ struct hsm_progress_kernel *pgs)
{
struct mdt_device *mdt = mti->mti_mdt;
struct coordinator *cdt = &mdt->mdt_coordinator;
hsm_init_ucred(mdt_ucred(mti));
if (pgs->hpk_flags & HP_FLAG_COMPLETED) {
- enum agent_req_status status;
+ enum agent_req_status status;
+ struct hsm_record_update update;
+ int rc1;
rc = hsm_cdt_request_completed(mti, pgs, car, &status);
- CDEBUG(D_HSM, "%s record: fid="DFID" cookie=%#llx action=%s "
+ CDEBUG(D_HSM, "updating record: fid="DFID" cookie=%#llx action=%s "
"status=%s\n",
- update_record ? "Updating" : "Not updating",
PFID(&pgs->hpk_fid), pgs->hpk_cookie,
hsm_copytool_action2name(car->car_hai->hai_action),
agent_req_status2name(status));
/* update record first (LU-9075) */
- if (update_record) {
- int rc1;
-
- rc1 = mdt_agent_record_update(mti->mti_env, mdt,
- &pgs->hpk_cookie, 1,
- status);
- if (rc1)
- CERROR("%s: mdt_agent_record_update() failed,"
- " rc=%d, cannot update status to %s"
- " for cookie %#llx\n",
- mdt_obd_name(mdt), rc1,
- agent_req_status2name(status),
- pgs->hpk_cookie);
- rc = (rc != 0 ? rc : rc1);
- }
+ update.cookie = pgs->hpk_cookie;
+ update.status = status;
+
+ rc1 = mdt_agent_record_update(mti->mti_env, mdt,
+ &update, 1);
+ if (rc1)
+ CERROR("%s: mdt_agent_record_update() failed,"
+ " rc=%d, cannot update status to %s"
+ " for cookie %#llx\n",
+ mdt_obd_name(mdt), rc1,
+ agent_req_status2name(status),
+ pgs->hpk_cookie);
+ rc = (rc != 0 ? rc : rc1);
/* then remove request from memory list (LU-9075) */
mdt_cdt_remove_request(cdt, pgs->hpk_cookie);
- /* ct has completed a request, so a slot is available, wakeup
- * cdt to find new work */
- mdt_hsm_cdt_wakeup(mdt);
+ /* ct has completed a request, so a slot is available,
+ * signal the coordinator to find new work */
+ mdt_hsm_cdt_event(cdt);
} else {
/* if copytool send a progress on a canceled request
* we inform copytool it should stop
if (larr->arr_status == ARS_WAITING ||
larr->arr_status == ARS_STARTED) {
larr->arr_status = ARS_CANCELED;
- larr->arr_req_change = cfs_time_current_sec();
+ larr->arr_req_change = ktime_get_real_seconds();
rc = llog_write(env, llh, hdr, hdr->lrh_index);
}
hsm_init_ucred(mdt_ucred(mti));
+ mutex_lock(&cdt->cdt_state_lock);
+ old_state = cdt->cdt_state;
+
/* disable coordinator */
- rc = set_cdt_state(cdt, CDT_DISABLE, &old_state);
+ rc = set_cdt_state_locked(cdt, CDT_DISABLE);
if (rc)
- RETURN(rc);
+ GOTO(out_cdt_state_unlock, rc);
/* send cancel to all running requests */
down_read(&cdt->cdt_request_lock);
&hcad, 0, 0, WRITE);
out_cdt_state:
/* Enable coordinator, unless the coordinator was stopping. */
- set_cdt_state(cdt, old_state, NULL);
+ set_cdt_state_locked(cdt, old_state);
+out_cdt_state_unlock:
+ mutex_unlock(&cdt->cdt_state_lock);
+
lu_context_exit(&session);
lu_context_fini(&session);
out_env:
struct seq_file *m = file->private_data; \
struct mdt_device *mdt = m->private; \
struct coordinator *cdt = &mdt->mdt_coordinator; \
- __s64 val; \
- int rc; \
- ENTRY; \
+ unsigned int val; \
+ int rc; \
\
- rc = lprocfs_str_to_s64(buffer, count, &val); \
+ ENTRY; \
+ rc = kstrtouint_from_user(buffer, count, 0, &val); \
if (rc) \
RETURN(rc); \
- if (val > 0 && val < INT_MAX) { \
+ \
+ if (val != 0) { \
cdt->VAR = val; \
RETURN(count); \
} \
rc = 0;
if (strcmp(kernbuf, CDT_ENABLE_CMD) == 0) {
if (cdt->cdt_state == CDT_DISABLE) {
- rc = set_cdt_state(cdt, CDT_RUNNING, NULL);
- mdt_hsm_cdt_wakeup(mdt);
+ rc = set_cdt_state(cdt, CDT_RUNNING);
+ mdt_hsm_cdt_event(cdt);
+ wake_up(&cdt->cdt_waitq);
} else {
rc = mdt_hsm_cdt_start(mdt);
}
rc = -EALREADY;
} else {
rc = mdt_hsm_cdt_stop(mdt);
- mdt_hsm_cdt_wakeup(mdt);
}
} else if (strcmp(kernbuf, CDT_DISABLE_CMD) == 0) {
if ((cdt->cdt_state == CDT_STOPPING) ||
mdt_obd_name(mdt));
rc = -EINVAL;
} else {
- rc = set_cdt_state(cdt, CDT_DISABLE, NULL);
+ rc = set_cdt_state(cdt, CDT_DISABLE);
}
} else if (strcmp(kernbuf, CDT_PURGE_CMD) == 0) {
rc = hsm_cancel_all_actions(mdt);
struct seq_file *m = file->private_data;
struct mdt_device *mdt = m->private;
struct coordinator *cdt = &mdt->mdt_coordinator;
- __s64 val;
+ bool val;
int rc;
- ENTRY;
- rc = lprocfs_str_to_s64(buffer, count, &val);
+ ENTRY;
+ rc = kstrtobool_from_user(buffer, count, &val);
if (rc < 0)
RETURN(rc);
LPROC_SEQ_FOPS(mdt_hsm_other_request_mask);
LPROC_SEQ_FOPS(mdt_hsm_cdt_raolu);
+/* Read-only proc files for request counters */
+static int mdt_hsm_cdt_archive_count_seq_show(struct seq_file *m, void *data)
+{
+ struct mdt_device *mdt = m->private;
+ struct coordinator *cdt = &mdt->mdt_coordinator;
+ ENTRY;
+
+ seq_printf(m, "%d\n", atomic_read(&cdt->cdt_archive_count));
+ RETURN(0);
+}
+
+static int mdt_hsm_cdt_restore_count_seq_show(struct seq_file *m, void *data)
+{
+ struct mdt_device *mdt = m->private;
+ struct coordinator *cdt = &mdt->mdt_coordinator;
+ ENTRY;
+
+ seq_printf(m, "%d\n", atomic_read(&cdt->cdt_restore_count));
+ RETURN(0);
+}
+
+static int mdt_hsm_cdt_remove_count_seq_show(struct seq_file *m, void *data)
+{
+ struct mdt_device *mdt = m->private;
+ struct coordinator *cdt = &mdt->mdt_coordinator;
+ ENTRY;
+
+ seq_printf(m, "%d\n", atomic_read(&cdt->cdt_remove_count));
+ RETURN(0);
+}
+
+LPROC_SEQ_FOPS_RO(mdt_hsm_cdt_archive_count);
+LPROC_SEQ_FOPS_RO(mdt_hsm_cdt_restore_count);
+LPROC_SEQ_FOPS_RO(mdt_hsm_cdt_remove_count);
+
static struct lprocfs_vars lprocfs_mdt_hsm_vars[] = {
{ .name = "agents",
.fops = &mdt_hsm_agent_fops },
.fops = &mdt_hsm_other_request_mask_fops, },
{ .name = "remove_archive_on_last_unlink",
.fops = &mdt_hsm_cdt_raolu_fops, },
+ { .name = "archive_count",
+ .fops = &mdt_hsm_cdt_archive_count_fops, },
+ { .name = "restore_count",
+ .fops = &mdt_hsm_cdt_restore_count_fops, },
+ { .name = "remove_count",
+ .fops = &mdt_hsm_cdt_remove_count_fops, },
{ 0 }
};