#include <linux/kthread.h>
#include <obd_support.h>
-#include <lustre_net.h>
#include <lustre_export.h>
#include <obd.h>
#include <lprocfs_status.h>
struct hsm_scan_request *request;
};
+struct hsm_thread_data {
+ struct mdt_thread_info *cdt_mti;
+ struct hsm_scan_request *request;
+};
/**
* llog_cat_process() callback, used to:
* - find waiting request and start action
request->hal_used_sz += cfs_size_round(hai->hai_len);
request->hal->hal_count++;
+
+ if (hai->hai_action != HSMA_CANCEL)
+ cdt_agent_record_hash_add(cdt, hai->hai_cookie,
+ llh->lgh_hdr->llh_cat_idx,
+ hdr->lrh_index);
break;
}
case ARS_STARTED: {
* error may happen if coordinator crashes or stopped
* with running request
*/
- car = mdt_cdt_find_request(cdt, larr->arr_hai.hai_cookie, NULL);
+ car = mdt_cdt_find_request(cdt, larr->arr_hai.hai_cookie);
if (car == NULL) {
last = larr->arr_req_change;
} else {
* to the client, for which an error will be
* sent back, leading to an endless cycle of
* cancellation. */
+ cdt_agent_record_hash_del(cdt,
+ larr->arr_hai.hai_cookie);
RETURN(LLOG_DEL_RECORD);
}
case ARS_CANCELED:
case ARS_SUCCEED:
if ((larr->arr_req_change + cdt->cdt_grace_delay) <
- cfs_time_current_sec())
+ cfs_time_current_sec()) {
+ cdt_agent_record_hash_del(cdt,
+ larr->arr_hai.hai_cookie);
RETURN(LLOG_DEL_RECORD);
+ }
break;
}
RETURN(0);
* remove /proc entries for coordinator
* \param mdt [IN]
*/
-void hsm_cdt_procfs_fini(struct mdt_device *mdt)
+void hsm_cdt_procfs_fini(struct mdt_device *mdt)
{
- struct coordinator *cdt = &mdt->mdt_coordinator;
+ struct coordinator *cdt = &mdt->mdt_coordinator;
- LASSERT(cdt->cdt_state == CDT_STOPPED);
if (cdt->cdt_proc_dir != NULL)
lprocfs_remove(&cdt->cdt_proc_dir);
}
return lprocfs_mdt_hsm_vars;
}
+/* Release the ressource used by the coordinator. Called when the
+ * coordinator is stopping. */
+static void mdt_hsm_cdt_cleanup(struct mdt_device *mdt)
+{
+ struct coordinator *cdt = &mdt->mdt_coordinator;
+ struct cdt_agent_req *car, *tmp1;
+ struct hsm_agent *ha, *tmp2;
+ struct cdt_restore_handle *crh, *tmp3;
+ struct mdt_thread_info *cdt_mti;
+
+ /* start cleaning */
+ down_write(&cdt->cdt_request_lock);
+ list_for_each_entry_safe(car, tmp1, &cdt->cdt_request_list,
+ car_request_list) {
+ cfs_hash_del(cdt->cdt_request_cookie_hash,
+ &car->car_hai->hai_cookie,
+ &car->car_cookie_hash);
+ list_del(&car->car_request_list);
+ mdt_cdt_put_request(car);
+ }
+ up_write(&cdt->cdt_request_lock);
+
+ down_write(&cdt->cdt_agent_lock);
+ list_for_each_entry_safe(ha, tmp2, &cdt->cdt_agents, ha_list) {
+ list_del(&ha->ha_list);
+ OBD_FREE_PTR(ha);
+ }
+ up_write(&cdt->cdt_agent_lock);
+
+ cdt_mti = lu_context_key_get(&cdt->cdt_env.le_ctx, &mdt_thread_key);
+ mutex_lock(&cdt->cdt_restore_lock);
+ list_for_each_entry_safe(crh, tmp3, &cdt->cdt_restore_hdl, crh_list) {
+ list_del(&crh->crh_list);
+ /* give back layout lock */
+ mdt_object_unlock(cdt_mti, NULL, &crh->crh_lh, 1);
+ OBD_SLAB_FREE_PTR(crh, mdt_hsm_cdt_kmem);
+ }
+ mutex_unlock(&cdt->cdt_restore_lock);
+}
+
+/*
+ * Coordinator state transition table, indexed on enum cdt_states, taking
+ * from and to states. For instance since CDT_INIT to CDT_RUNNING is a
+ * valid transition, cdt_transition[CDT_INIT][CDT_RUNNING] is true.
+ */
+static bool cdt_transition[CDT_STATES_COUNT][CDT_STATES_COUNT] = {
+ /* from -> to: stopped init running disable stopping */
+ /* stopped */ { true, true, false, false, false },
+ /* init */ { true, false, true, false, false },
+ /* running */ { false, false, true, true, true },
+ /* disable */ { false, false, true, true, true },
+ /* stopping */ { true, false, false, false, false }
+};
+
+/**
+ * Change coordinator thread state
+ * Some combinations are not valid, so catch them here.
+ *
+ * Returns 0 on success, with old_state set if not NULL, or -EINVAL if
+ * the transition was not possible.
+ */
+static int set_cdt_state(struct coordinator *cdt, enum cdt_states new_state,
+ enum cdt_states *old_state)
+{
+ int rc;
+ enum cdt_states state;
+
+ spin_lock(&cdt->cdt_state_lock);
+
+ state = cdt->cdt_state;
+
+ if (cdt_transition[state][new_state]) {
+ cdt->cdt_state = new_state;
+ spin_unlock(&cdt->cdt_state_lock);
+ if (old_state)
+ *old_state = state;
+ rc = 0;
+ } else {
+ spin_unlock(&cdt->cdt_state_lock);
+ CDEBUG(D_HSM,
+ "unexpected coordinator transition, from=%s, to=%s\n",
+ cdt_mdt_state2str(state), cdt_mdt_state2str(new_state));
+ rc = -EINVAL;
+ }
+
+ return rc;
+}
+
/**
* coordinator thread
* \param data [IN] obd device
*/
static int mdt_coordinator(void *data)
{
- struct mdt_thread_info *mti = data;
+ struct hsm_thread_data *thread_data = data;
+ struct mdt_thread_info *mti = thread_data->cdt_mti;
struct mdt_device *mdt = mti->mti_mdt;
struct coordinator *cdt = &mdt->mdt_coordinator;
struct hsm_scan_data hsd = { NULL };
+ time64_t wait_event_time = 1 * HZ;
+ time64_t last_housekeeping = 0;
int rc = 0;
int request_sz;
ENTRY;
- cdt->cdt_flags = SVC_RUNNING;
- wake_up(&cdt->cdt_waitq);
+ /* set up hsd->request and max_requests */
+ hsd.max_requests = cdt->cdt_max_requests;
+ request_sz = hsd.max_requests * sizeof(*hsd.request);
+ hsd.request = thread_data->request;
CDEBUG(D_HSM, "%s: coordinator thread starting, pid=%d\n",
mdt_obd_name(mdt), current_pid());
- /* we use a copy of cdt_max_requests in the cb, so if cdt_max_requests
- * increases due to a change from /proc we do not overflow the
- * hsd.request[] vector
- */
- hsd.max_requests = cdt->cdt_max_requests;
- request_sz = hsd.max_requests * sizeof(*hsd.request);
- OBD_ALLOC(hsd.request, request_sz);
- if (!hsd.request)
- GOTO(out, rc = -ENOMEM);
-
hsd.mti = mti;
obd_uuid2fsname(hsd.fs_name, mdt_obd_name(mdt), MTI_NAME_MAXLEN);
+ set_cdt_state(cdt, CDT_RUNNING, NULL);
+
+ /* Inform mdt_hsm_cdt_start(). */
+ wake_up_all(&cdt->cdt_waitq);
+
while (1) {
- struct l_wait_info lwi;
int i;
- lwi = LWI_TIMEOUT(cfs_time_seconds(cdt->cdt_loop_period),
- NULL, NULL);
- l_wait_event(cdt->cdt_waitq,
- cdt->cdt_flags & (SVC_STOPPING|SVC_EVENT),
- &lwi);
+ /* Limit execution of the expensive requests traversal
+ * to at most every "wait_event_time" jiffies. This prevents
+ * repeatedly locking/unlocking the catalog for each request
+ * and preventing other HSM operations from happening */
+ wait_event_interruptible_timeout(cdt->cdt_waitq,
+ kthread_should_stop(),
+ wait_event_time);
CDEBUG(D_HSM, "coordinator resumes\n");
- if (cdt->cdt_flags & SVC_STOPPING ||
- cdt->cdt_state == CDT_STOPPING) {
- cdt->cdt_flags &= ~SVC_STOPPING;
+ if (kthread_should_stop()) {
+ CDEBUG(D_HSM, "Coordinator stops\n");
rc = 0;
break;
}
- /* wake up before timeout, new work arrives */
- if (cdt->cdt_flags & SVC_EVENT)
- cdt->cdt_flags &= ~SVC_EVENT;
-
/* if coordinator is suspended continue to wait */
if (cdt->cdt_state == CDT_DISABLE) {
CDEBUG(D_HSM, "disable state, coordinator sleeps\n");
continue;
}
+ /* If no event, and no housekeeping to do, continue to
+ * wait. */
+ if (last_housekeeping + cdt->cdt_loop_period <= get_seconds())
+ last_housekeeping = get_seconds();
+ else if (!cdt->cdt_event)
+ continue;
+
+ cdt->cdt_event = false;
+
CDEBUG(D_HSM, "coordinator starts reading llog\n");
if (hsd.max_requests != cdt->cdt_max_requests) {
/* cdt_max_requests has changed,
* we need to allocate a new buffer
*/
- OBD_FREE(hsd.request, request_sz);
- hsd.max_requests = cdt->cdt_max_requests;
- request_sz = hsd.max_requests * sizeof(*hsd.request);
- OBD_ALLOC(hsd.request, request_sz);
- if (!hsd.request) {
- rc = -ENOMEM;
- break;
+ struct hsm_scan_request *tmp = NULL;
+ int max_requests = cdt->cdt_max_requests;
+ OBD_ALLOC_LARGE(tmp, max_requests *
+ sizeof(struct hsm_scan_request));
+ if (!tmp) {
+ CERROR("Failed to resize request buffer, "
+ "keeping it at %d\n",
+ hsd.max_requests);
+ cdt->cdt_max_requests = hsd.max_requests;
+ } else {
+ OBD_FREE_LARGE(hsd.request, request_sz);
+ hsd.max_requests = max_requests;
+ request_sz = hsd.max_requests *
+ sizeof(struct hsm_scan_request);
+ hsd.request = tmp;
}
}
hsd.request_cnt = 0;
- rc = cdt_llog_process(mti->mti_env, mdt,
- mdt_coordinator_cb, &hsd);
+ rc = cdt_llog_process(mti->mti_env, mdt, mdt_coordinator_cb,
+ &hsd, 0, 0, WRITE);
if (rc < 0)
goto clean_cb_alloc;
OBD_FREE(request->hal, request->hal_sz);
}
}
- EXIT;
-out:
+
if (hsd.request)
- OBD_FREE(hsd.request, request_sz);
+ OBD_FREE_LARGE(hsd.request, request_sz);
- if (cdt->cdt_state == CDT_STOPPING) {
- /* request comes from /proc path, so we need to clean cdt
- * struct */
- mdt_hsm_cdt_stop(mdt);
- mdt->mdt_opts.mo_coordinator = 0;
- } else {
- /* request comes from a thread event, generated
- * by mdt_stop_coordinator(), we have to ack
- * and cdt cleaning will be done by event sender
- */
- cdt->cdt_flags = SVC_STOPPED;
- wake_up(&cdt->cdt_waitq);
- }
+ mdt_hsm_cdt_cleanup(mdt);
if (rc != 0)
CERROR("%s: coordinator thread exiting, process=%d, rc=%d\n",
" no error\n",
mdt_obd_name(mdt), current_pid());
- return rc;
+ RETURN(rc);
}
/**
hrd.hrd_mti = mti;
- rc = cdt_llog_process(mti->mti_env, mti->mti_mdt,
- hsm_restore_cb, &hrd);
+ rc = cdt_llog_process(mti->mti_env, mti->mti_mdt, hsm_restore_cb, &hrd,
+ 0, 0, WRITE);
RETURN(rc);
}
}
/**
- * wake up coordinator thread
- * \param mdt [IN] device
- * \retval 0 success
- * \retval -ve failure
- */
-int mdt_hsm_cdt_wakeup(struct mdt_device *mdt)
-{
- struct coordinator *cdt = &mdt->mdt_coordinator;
- ENTRY;
-
- if (cdt->cdt_state == CDT_STOPPED)
- RETURN(-ESRCH);
-
- /* wake up coordinator */
- cdt->cdt_flags = SVC_EVENT;
- wake_up(&cdt->cdt_waitq);
-
- RETURN(0);
-}
-
-/**
* initialize coordinator struct
* \param mdt [IN] device
* \retval 0 success
int rc;
ENTRY;
- cdt->cdt_state = CDT_STOPPED;
-
init_waitqueue_head(&cdt->cdt_waitq);
- mutex_init(&cdt->cdt_llog_lock);
+ init_rwsem(&cdt->cdt_llog_lock);
init_rwsem(&cdt->cdt_agent_lock);
init_rwsem(&cdt->cdt_request_lock);
mutex_init(&cdt->cdt_restore_lock);
+ spin_lock_init(&cdt->cdt_state_lock);
+ set_cdt_state(cdt, CDT_STOPPED, NULL);
- INIT_LIST_HEAD(&cdt->cdt_requests);
+ INIT_LIST_HEAD(&cdt->cdt_request_list);
INIT_LIST_HEAD(&cdt->cdt_agents);
INIT_LIST_HEAD(&cdt->cdt_restore_hdl);
+ cdt->cdt_request_cookie_hash = cfs_hash_create("REQUEST_COOKIE_HASH",
+ CFS_HASH_BITS_MIN,
+ CFS_HASH_BITS_MAX,
+ CFS_HASH_BKT_BITS,
+ 0 /* extra bytes */,
+ CFS_HASH_MIN_THETA,
+ CFS_HASH_MAX_THETA,
+ &cdt_request_cookie_hash_ops,
+ CFS_HASH_DEFAULT);
+ if (cdt->cdt_request_cookie_hash == NULL)
+ RETURN(-ENOMEM);
+
+ cdt->cdt_agent_record_hash = cfs_hash_create("AGENT_RECORD_HASH",
+ CFS_HASH_BITS_MIN,
+ CFS_HASH_BITS_MAX,
+ CFS_HASH_BKT_BITS,
+ 0 /* extra bytes */,
+ CFS_HASH_MIN_THETA,
+ CFS_HASH_MAX_THETA,
+ &cdt_agent_record_hash_ops,
+ CFS_HASH_DEFAULT);
+ if (cdt->cdt_agent_record_hash == NULL)
+ GOTO(out_request_cookie_hash, rc = -ENOMEM);
+
rc = lu_env_init(&cdt->cdt_env, LCT_MD_THREAD);
if (rc < 0)
- RETURN(rc);
+ GOTO(out_agent_record_hash, rc);
/* for mdt_ucred(), lu_ucred stored in lu_ucred_key */
rc = lu_context_init(&cdt->cdt_session, LCT_SERVER_SESSION);
- if (rc == 0) {
- lu_context_enter(&cdt->cdt_session);
- cdt->cdt_env.le_ses = &cdt->cdt_session;
- } else {
- lu_env_fini(&cdt->cdt_env);
- RETURN(rc);
- }
+ if (rc < 0)
+ GOTO(out_env, rc);
+
+ lu_context_enter(&cdt->cdt_session);
+ cdt->cdt_env.le_ses = &cdt->cdt_session;
cdt_mti = lu_context_key_get(&cdt->cdt_env.le_ctx, &mdt_thread_key);
LASSERT(cdt_mti != NULL);
cdt->cdt_policy = CDT_DEFAULT_POLICY;
cdt->cdt_active_req_timeout = 3600;
+ /* Initialize cdt_compound_id here to allow its usage for
+ * delayed requests from RAoLU policy */
+ atomic_set(&cdt->cdt_compound_id, cfs_time_current_sec());
+
+ /* by default do not remove archives on last unlink */
+ cdt->cdt_remove_archive_on_last_unlink = false;
+
RETURN(0);
+
+out_env:
+ lu_env_fini(&cdt->cdt_env);
+out_agent_record_hash:
+ cfs_hash_putref(cdt->cdt_agent_record_hash);
+ cdt->cdt_agent_record_hash = NULL;
+out_request_cookie_hash:
+ cfs_hash_putref(cdt->cdt_request_cookie_hash);
+ cdt->cdt_request_cookie_hash = NULL;
+
+ return rc;
}
/**
lu_env_fini(&cdt->cdt_env);
+ cfs_hash_putref(cdt->cdt_agent_record_hash);
+ cdt->cdt_agent_record_hash = NULL;
+
+ cfs_hash_putref(cdt->cdt_request_cookie_hash);
+ cdt->cdt_request_cookie_hash = NULL;
+
RETURN(0);
}
struct coordinator *cdt = &mdt->mdt_coordinator;
int rc;
void *ptr;
- struct mdt_thread_info *cdt_mti;
struct task_struct *task;
+ int request_sz;
+ struct hsm_thread_data thread_data;
ENTRY;
/* functions defined but not yet used
*/
ptr = dump_requests;
- if (cdt->cdt_state != CDT_STOPPED) {
- CERROR("%s: Coordinator already started\n",
+ rc = set_cdt_state(cdt, CDT_INIT, NULL);
+ if (rc) {
+ CERROR("%s: Coordinator already started or stopping\n",
mdt_obd_name(mdt));
RETURN(-EALREADY);
}
CLASSERT(1 << (CDT_POLICY_SHIFT_COUNT - 1) == CDT_POLICY_LAST);
cdt->cdt_policy = CDT_DEFAULT_POLICY;
- cdt->cdt_state = CDT_INIT;
-
- atomic_set(&cdt->cdt_compound_id, cfs_time_current_sec());
/* just need to be larger than previous one */
/* cdt_last_cookie is protected by cdt_llog_lock */
cdt->cdt_last_cookie = cfs_time_current_sec();
* /proc entries are created by the coordinator thread */
/* set up list of started restore requests */
- cdt_mti = lu_context_key_get(&cdt->cdt_env.le_ctx, &mdt_thread_key);
- rc = mdt_hsm_pending_restore(cdt_mti);
+ thread_data.cdt_mti =
+ lu_context_key_get(&cdt->cdt_env.le_ctx, &mdt_thread_key);
+ rc = mdt_hsm_pending_restore(thread_data.cdt_mti);
if (rc)
CERROR("%s: cannot take the layout locks needed"
" for registered restore: %d\n",
if (mdt->mdt_bottom->dd_rdonly)
RETURN(0);
- task = kthread_run(mdt_coordinator, cdt_mti, "hsm_cdtr");
+ /* Allocate the initial hsd.request[] vector*/
+ request_sz = cdt->cdt_max_requests * sizeof(struct hsm_scan_request);
+ OBD_ALLOC_LARGE(thread_data.request, request_sz);
+ if (!thread_data.request) {
+ set_cdt_state(cdt, CDT_STOPPED, NULL);
+ RETURN(-ENOMEM);
+ }
+
+ task = kthread_run(mdt_coordinator, &thread_data, "hsm_cdtr");
if (IS_ERR(task)) {
rc = PTR_ERR(task);
- cdt->cdt_state = CDT_STOPPED;
+ set_cdt_state(cdt, CDT_STOPPED, NULL);
+ OBD_FREE(thread_data.request, request_sz);
CERROR("%s: error starting coordinator thread: %d\n",
mdt_obd_name(mdt), rc);
- RETURN(rc);
} else {
+ cdt->cdt_task = task;
+ wait_event(cdt->cdt_waitq,
+ cdt->cdt_state != CDT_INIT);
CDEBUG(D_HSM, "%s: coordinator thread started\n",
mdt_obd_name(mdt));
rc = 0;
}
- wait_event(cdt->cdt_waitq,
- (cdt->cdt_flags & SVC_RUNNING));
-
- cdt->cdt_state = CDT_RUNNING;
- mdt->mdt_opts.mo_coordinator = 1;
- RETURN(0);
+ RETURN(rc);
}
/**
*/
int mdt_hsm_cdt_stop(struct mdt_device *mdt)
{
- struct coordinator *cdt = &mdt->mdt_coordinator;
- struct cdt_agent_req *car, *tmp1;
- struct hsm_agent *ha, *tmp2;
- struct cdt_restore_handle *crh, *tmp3;
- struct mdt_thread_info *cdt_mti;
- ENTRY;
-
- if (mdt->mdt_opts.mo_coordinator == 0)
- RETURN(0);
-
- if (cdt->cdt_state == CDT_STOPPED) {
- CERROR("%s: Coordinator already stopped\n",
- mdt_obd_name(mdt));
- RETURN(-EALREADY);
- }
-
- if (cdt->cdt_state != CDT_STOPPING) {
- /* stop coordinator thread before cleaning */
- cdt->cdt_flags = SVC_STOPPING;
- wake_up(&cdt->cdt_waitq);
- wait_event(cdt->cdt_waitq,
- cdt->cdt_flags & SVC_STOPPED);
- }
- cdt->cdt_state = CDT_STOPPED;
-
- /* start cleaning */
- down_write(&cdt->cdt_request_lock);
- list_for_each_entry_safe(car, tmp1, &cdt->cdt_requests,
- car_request_list) {
- list_del(&car->car_request_list);
- mdt_cdt_free_request(car);
- }
- up_write(&cdt->cdt_request_lock);
+ struct coordinator *cdt = &mdt->mdt_coordinator;
+ int rc;
- down_write(&cdt->cdt_agent_lock);
- list_for_each_entry_safe(ha, tmp2, &cdt->cdt_agents, ha_list) {
- list_del(&ha->ha_list);
- OBD_FREE_PTR(ha);
+ ENTRY;
+ /* stop coordinator thread */
+ rc = set_cdt_state(cdt, CDT_STOPPING, NULL);
+ if (rc == 0) {
+ kthread_stop(cdt->cdt_task);
+ cdt->cdt_task = NULL;
+ set_cdt_state(cdt, CDT_STOPPED, NULL);
}
- up_write(&cdt->cdt_agent_lock);
- cdt_mti = lu_context_key_get(&cdt->cdt_env.le_ctx, &mdt_thread_key);
- mutex_lock(&cdt->cdt_restore_lock);
- list_for_each_entry_safe(crh, tmp3, &cdt->cdt_restore_hdl, crh_list) {
- struct mdt_object *child;
+ RETURN(rc);
+}
- /* give back layout lock */
- child = mdt_object_find(&cdt->cdt_env, mdt, &crh->crh_fid);
- if (!IS_ERR(child))
- mdt_object_unlock_put(cdt_mti, child, &crh->crh_lh, 1);
+static int mdt_hsm_set_exists(struct mdt_thread_info *mti,
+ const struct lu_fid *fid,
+ u32 archive_id)
+{
+ struct mdt_object *obj;
+ struct md_hsm mh;
+ int rc;
- list_del(&crh->crh_list);
+ obj = mdt_hsm_get_md_hsm(mti, fid, &mh);
+ if (IS_ERR(obj))
+ GOTO(out, rc = PTR_ERR(obj));
- OBD_SLAB_FREE_PTR(crh, mdt_hsm_cdt_kmem);
- }
- mutex_unlock(&cdt->cdt_restore_lock);
+ if (mh.mh_flags & HS_EXISTS &&
+ mh.mh_arch_id == archive_id)
+ GOTO(out_obj, rc = 0);
- mdt->mdt_opts.mo_coordinator = 0;
+ mh.mh_flags |= HS_EXISTS;
+ mh.mh_arch_id = archive_id;
+ rc = mdt_hsm_attr_set(mti, obj, &mh);
- RETURN(0);
+out_obj:
+ mdt_object_put(mti->mti_env, obj);
+out:
+ return rc;
}
/**
}
/* find the running request to set it canceled */
- car = mdt_cdt_find_request(cdt, hai->hai_cookie, NULL);
+ car = mdt_cdt_find_request(cdt, hai->hai_cookie);
if (car != NULL) {
car->car_canceled = 1;
/* uuid has to be changed to the one running the
}
if (hai->hai_action == HSMA_ARCHIVE) {
- struct mdt_object *obj;
- struct md_hsm hsm;
-
- obj = mdt_hsm_get_md_hsm(mti, &hai->hai_fid, &hsm);
- if (IS_ERR(obj) && (PTR_ERR(obj) == -ENOENT))
+ rc = mdt_hsm_set_exists(mti, &hai->hai_fid,
+ hal->hal_archive_id);
+ if (rc == -ENOENT)
continue;
- if (IS_ERR(obj))
- GOTO(out, rc = PTR_ERR(obj));
-
- hsm.mh_flags |= HS_EXISTS;
- hsm.mh_arch_id = hal->hal_archive_id;
- rc = mdt_hsm_attr_set(mti, obj, &hsm);
- mdt_object_put(mti->mti_env, obj);
- if (rc)
+ else if (rc < 0)
GOTO(out, rc);
}
/* restore in data FID done, we swap the layouts
* only if restore is successful */
- if (pgs->hpk_errval == 0 && !IS_ERR_OR_NULL(obj)) {
+ if (pgs->hpk_errval == 0 && !IS_ERR(obj)) {
rc = hsm_swap_layouts(mti, obj, &car->car_hai->hai_dfid,
&mh);
if (rc) {
/* then remove request from memory list (LU-9075) */
mdt_cdt_remove_request(cdt, pgs->hpk_cookie);
- /* ct has completed a request, so a slot is available, wakeup
- * cdt to find new work */
- mdt_hsm_cdt_wakeup(mdt);
+ /* ct has completed a request, so a slot is available,
+ * signal the coordinator to find new work */
+ mdt_hsm_cdt_event(cdt);
} else {
/* if copytool send a progress on a canceled request
* we inform copytool it should stop
struct hsm_action_item *hai;
struct hsm_cancel_all_data hcad;
int hal_sz = 0, hal_len, rc;
- enum cdt_states save_state;
+ enum cdt_states old_state;
ENTRY;
rc = lu_env_init(&env, LCT_MD_THREAD);
hsm_init_ucred(mdt_ucred(mti));
/* disable coordinator */
- save_state = cdt->cdt_state;
- cdt->cdt_state = CDT_DISABLE;
+ rc = set_cdt_state(cdt, CDT_DISABLE, &old_state);
+ if (rc)
+ RETURN(rc);
/* send cancel to all running requests */
down_read(&cdt->cdt_request_lock);
- list_for_each_entry(car, &cdt->cdt_requests, car_request_list) {
+ list_for_each_entry(car, &cdt->cdt_request_list, car_request_list) {
mdt_cdt_get_request(car);
/* request is not yet removed from list, it will be done
* when copytool will return progress
/* cancel all on-disk records */
hcad.mdt = mdt;
- rc = cdt_llog_process(mti->mti_env, mti->mti_mdt,
- mdt_cancel_all_cb, &hcad);
+ rc = cdt_llog_process(mti->mti_env, mti->mti_mdt, mdt_cancel_all_cb,
+ &hcad, 0, 0, WRITE);
out_cdt_state:
- /* enable coordinator */
- cdt->cdt_state = save_state;
+ /* Enable coordinator, unless the coordinator was stopping. */
+ set_cdt_state(cdt, old_state, NULL);
lu_context_exit(&session);
lu_context_fini(&session);
out_env:
/**
* check if a request is compatible with file status
* \param hai [IN] request description
- * \param hal_an [IN] request archive number (not used)
+ * \param archive_id [IN] request archive id
* \param rq_flags [IN] request flags
* \param hsm [IN] file HSM metadata
* \retval boolean
*/
bool mdt_hsm_is_action_compat(const struct hsm_action_item *hai,
- const int hal_an, const __u64 rq_flags,
+ u32 archive_id, u64 rq_flags,
const struct md_hsm *hsm)
{
int is_compat = false;
if (!(hsm_flags & HS_NOARCHIVE) &&
(hsm_flags & HS_DIRTY || !(hsm_flags & HS_ARCHIVED)))
is_compat = true;
+
+ if (hsm_flags & HS_EXISTS &&
+ archive_id != 0 &&
+ archive_id != hsm->mh_arch_id)
+ is_compat = false;
+
break;
case HSMA_RESTORE:
if (!(hsm_flags & HS_DIRTY) && (hsm_flags & HS_RELEASED) &&
rc = 0;
if (strcmp(kernbuf, CDT_ENABLE_CMD) == 0) {
if (cdt->cdt_state == CDT_DISABLE) {
- cdt->cdt_state = CDT_RUNNING;
- mdt_hsm_cdt_wakeup(mdt);
+ rc = set_cdt_state(cdt, CDT_RUNNING, NULL);
+ mdt_hsm_cdt_event(cdt);
+ wake_up(&cdt->cdt_waitq);
} else {
rc = mdt_hsm_cdt_start(mdt);
}
mdt_obd_name(mdt));
rc = -EALREADY;
} else {
- cdt->cdt_state = CDT_STOPPING;
+ rc = mdt_hsm_cdt_stop(mdt);
}
} else if (strcmp(kernbuf, CDT_DISABLE_CMD) == 0) {
if ((cdt->cdt_state == CDT_STOPPING) ||
mdt_obd_name(mdt));
rc = -EINVAL;
} else {
- cdt->cdt_state = CDT_DISABLE;
+ rc = set_cdt_state(cdt, CDT_DISABLE, NULL);
}
} else if (strcmp(kernbuf, CDT_PURGE_CMD) == 0) {
rc = hsm_cancel_all_actions(mdt);
cdt = &(mdt_dev(obd->obd_lu_dev)->mdt_coordinator);
- if (cdt->cdt_state == CDT_INIT)
- seq_printf(m, "init\n");
- else if (cdt->cdt_state == CDT_RUNNING)
- seq_printf(m, "enabled\n");
- else if (cdt->cdt_state == CDT_STOPPING)
- seq_printf(m, "stopping\n");
- else if (cdt->cdt_state == CDT_STOPPED)
- seq_printf(m, "stopped\n");
- else if (cdt->cdt_state == CDT_DISABLE)
- seq_printf(m, "disabled\n");
- else
- seq_printf(m, "unknown\n");
+ seq_printf(m, "%s\n", cdt_mdt_state2str(cdt->cdt_state));
RETURN(0);
}
&cdt->cdt_other_request_mask);
}
+static int mdt_hsm_cdt_raolu_seq_show(struct seq_file *m, void *data)
+{
+ struct mdt_device *mdt = m->private;
+ struct coordinator *cdt = &mdt->mdt_coordinator;
+ ENTRY;
+
+ seq_printf(m, "%d\n", (int)cdt->cdt_remove_archive_on_last_unlink);
+ RETURN(0);
+}
+
+static ssize_t
+mdt_hsm_cdt_raolu_seq_write(struct file *file, const char __user *buffer,
+ size_t count, loff_t *off)
+
+{
+ struct seq_file *m = file->private_data;
+ struct mdt_device *mdt = m->private;
+ struct coordinator *cdt = &mdt->mdt_coordinator;
+ __s64 val;
+ int rc;
+ ENTRY;
+
+ rc = lprocfs_str_to_s64(buffer, count, &val);
+ if (rc < 0)
+ RETURN(rc);
+
+ cdt->cdt_remove_archive_on_last_unlink = val;
+ RETURN(count);
+}
+
LPROC_SEQ_FOPS(mdt_hsm_cdt_loop_period);
LPROC_SEQ_FOPS(mdt_hsm_cdt_grace_delay);
LPROC_SEQ_FOPS(mdt_hsm_cdt_active_req_timeout);
LPROC_SEQ_FOPS(mdt_hsm_user_request_mask);
LPROC_SEQ_FOPS(mdt_hsm_group_request_mask);
LPROC_SEQ_FOPS(mdt_hsm_other_request_mask);
+LPROC_SEQ_FOPS(mdt_hsm_cdt_raolu);
static struct lprocfs_vars lprocfs_mdt_hsm_vars[] = {
{ .name = "agents",
.fops = &mdt_hsm_group_request_mask_fops, },
{ .name = "other_request_mask",
.fops = &mdt_hsm_other_request_mask_fops, },
+ { .name = "remove_archive_on_last_unlink",
+ .fops = &mdt_hsm_cdt_raolu_fops, },
{ 0 }
};