Use kthread_stop() to stop and join the coordinator thread. Only after
that step can the coordinator state be set to CDT_STOPPED. As a
coordinator doesn't stop instantly, this closes a race if the
coordinator is being restarted at the same time, leaving one thread
shutting down and a new one starting up.
Test-Parameters: trivial testlist=sanity-hsm
Signed-off-by: frank zago <fzago@cray.com>
Change-Id: I0a21d0d22403a56a8965441e1b57118073b6f210
Signed-off-by: Ben Evans <bevans@cray.com>
Reviewed-on: https://review.whamcloud.com/22762
Tested-by: Jenkins
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Quentin Bouget <quentin.bouget@cea.fr>
Reviewed-by: Faccini Bruno <bruno.faccini@intel.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
struct hsm_scan_request *request;
};
struct hsm_scan_request *request;
};
+struct hsm_thread_data {
+ struct mdt_thread_info *cdt_mti;
+ struct hsm_scan_request *request;
+};
/**
* llog_cat_process() callback, used to:
* - find waiting request and start action
/**
* llog_cat_process() callback, used to:
* - find waiting request and start action
static bool cdt_transition[CDT_STATES_COUNT][CDT_STATES_COUNT] = {
/* from -> to: stopped init running disable stopping */
/* stopped */ { true, true, false, false, false },
static bool cdt_transition[CDT_STATES_COUNT][CDT_STATES_COUNT] = {
/* from -> to: stopped init running disable stopping */
/* stopped */ { true, true, false, false, false },
- /* init */ { true, false, true, false, true },
+ /* init */ { true, false, true, false, false },
/* running */ { false, false, true, true, true },
/* disable */ { false, false, true, true, true },
/* running */ { false, false, true, true, true },
/* disable */ { false, false, true, true, true },
- /* stopping */ { true, false, false, false, true }
+ /* stopping */ { true, false, false, false, false }
*/
static int mdt_coordinator(void *data)
{
*/
static int mdt_coordinator(void *data)
{
- struct mdt_thread_info *mti = data;
+ struct hsm_thread_data *thread_data = data;
+ struct mdt_thread_info *mti = thread_data->cdt_mti;
struct mdt_device *mdt = mti->mti_mdt;
struct coordinator *cdt = &mdt->mdt_coordinator;
struct hsm_scan_data hsd = { NULL };
struct mdt_device *mdt = mti->mti_mdt;
struct coordinator *cdt = &mdt->mdt_coordinator;
struct hsm_scan_data hsd = { NULL };
- CDEBUG(D_HSM, "%s: coordinator thread starting, pid=%d\n",
- mdt_obd_name(mdt), current_pid());
-
- /* we use a copy of cdt_max_requests in the cb, so if cdt_max_requests
- * increases due to a change from /proc we do not overflow the
- * hsd.request[] vector
- */
+ /* set up hsd->request and max_requests */
hsd.max_requests = cdt->cdt_max_requests;
request_sz = hsd.max_requests * sizeof(*hsd.request);
hsd.max_requests = cdt->cdt_max_requests;
request_sz = hsd.max_requests * sizeof(*hsd.request);
- OBD_ALLOC_LARGE(hsd.request, request_sz);
- if (!hsd.request)
- GOTO(out, rc = -ENOMEM);
+ hsd.request = thread_data->request;
+
+ CDEBUG(D_HSM, "%s: coordinator thread starting, pid=%d\n",
+ mdt_obd_name(mdt), current_pid());
hsd.mti = mti;
obd_uuid2fsname(hsd.fs_name, mdt_obd_name(mdt), MTI_NAME_MAXLEN);
hsd.mti = mti;
obd_uuid2fsname(hsd.fs_name, mdt_obd_name(mdt), MTI_NAME_MAXLEN);
lwi = LWI_TIMEOUT(cfs_time_seconds(cdt->cdt_loop_period),
NULL, NULL);
l_wait_event(cdt->cdt_waitq,
lwi = LWI_TIMEOUT(cfs_time_seconds(cdt->cdt_loop_period),
NULL, NULL);
l_wait_event(cdt->cdt_waitq,
- cdt->cdt_event || (cdt->cdt_state == CDT_STOPPING),
+ cdt->cdt_event || kthread_should_stop(),
&lwi);
CDEBUG(D_HSM, "coordinator resumes\n");
&lwi);
CDEBUG(D_HSM, "coordinator resumes\n");
- if (cdt->cdt_state == CDT_STOPPING) {
+ if (kthread_should_stop()) {
/* cdt_max_requests has changed,
* we need to allocate a new buffer
*/
/* cdt_max_requests has changed,
* we need to allocate a new buffer
*/
- OBD_FREE_LARGE(hsd.request, request_sz);
- hsd.max_requests = cdt->cdt_max_requests;
- request_sz = hsd.max_requests * sizeof(*hsd.request);
- OBD_ALLOC_LARGE(hsd.request, request_sz);
- if (!hsd.request) {
- rc = -ENOMEM;
- break;
+ struct hsm_scan_request *tmp = NULL;
+ int max_requests = cdt->cdt_max_requests;
+ OBD_ALLOC_LARGE(tmp, max_requests *
+ sizeof(struct hsm_scan_request));
+ if (!tmp) {
+ CERROR("Failed to resize request buffer, "
+ "keeping it at %d\n",
+ hsd.max_requests);
+ cdt->cdt_max_requests = hsd.max_requests;
+ } else {
+ OBD_FREE_LARGE(hsd.request, request_sz);
+ hsd.max_requests = max_requests;
+ request_sz = hsd.max_requests *
+ sizeof(struct hsm_scan_request);
+ hsd.request = tmp;
OBD_FREE(request->hal, request->hal_sz);
}
}
OBD_FREE(request->hal, request->hal_sz);
}
}
- EXIT;
-out:
- set_cdt_state(cdt, CDT_STOPPING, NULL);
if (hsd.request)
OBD_FREE_LARGE(hsd.request, request_sz);
mdt_hsm_cdt_cleanup(mdt);
if (hsd.request)
OBD_FREE_LARGE(hsd.request, request_sz);
mdt_hsm_cdt_cleanup(mdt);
- set_cdt_state(cdt, CDT_STOPPED, NULL);
- wake_up_all(&cdt->cdt_waitq);
-
if (rc != 0)
CERROR("%s: coordinator thread exiting, process=%d, rc=%d\n",
mdt_obd_name(mdt), current_pid(), rc);
if (rc != 0)
CERROR("%s: coordinator thread exiting, process=%d, rc=%d\n",
mdt_obd_name(mdt), current_pid(), rc);
" no error\n",
mdt_obd_name(mdt), current_pid());
" no error\n",
mdt_obd_name(mdt), current_pid());
init_rwsem(&cdt->cdt_request_lock);
mutex_init(&cdt->cdt_restore_lock);
spin_lock_init(&cdt->cdt_state_lock);
init_rwsem(&cdt->cdt_request_lock);
mutex_init(&cdt->cdt_restore_lock);
spin_lock_init(&cdt->cdt_state_lock);
- cdt->cdt_state = CDT_STOPPED;
+ set_cdt_state(cdt, CDT_STOPPED, NULL);
INIT_LIST_HEAD(&cdt->cdt_request_list);
INIT_LIST_HEAD(&cdt->cdt_agents);
INIT_LIST_HEAD(&cdt->cdt_request_list);
INIT_LIST_HEAD(&cdt->cdt_agents);
struct coordinator *cdt = &mdt->mdt_coordinator;
int rc;
void *ptr;
struct coordinator *cdt = &mdt->mdt_coordinator;
int rc;
void *ptr;
- struct mdt_thread_info *cdt_mti;
struct task_struct *task;
struct task_struct *task;
+ int request_sz;
+ struct hsm_thread_data thread_data;
ENTRY;
/* functions defined but not yet used
ENTRY;
/* functions defined but not yet used
* /proc entries are created by the coordinator thread */
/* set up list of started restore requests */
* /proc entries are created by the coordinator thread */
/* set up list of started restore requests */
- cdt_mti = lu_context_key_get(&cdt->cdt_env.le_ctx, &mdt_thread_key);
- rc = mdt_hsm_pending_restore(cdt_mti);
+ thread_data.cdt_mti =
+ lu_context_key_get(&cdt->cdt_env.le_ctx, &mdt_thread_key);
+ rc = mdt_hsm_pending_restore(thread_data.cdt_mti);
if (rc)
CERROR("%s: cannot take the layout locks needed"
" for registered restore: %d\n",
if (rc)
CERROR("%s: cannot take the layout locks needed"
" for registered restore: %d\n",
if (mdt->mdt_bottom->dd_rdonly)
RETURN(0);
if (mdt->mdt_bottom->dd_rdonly)
RETURN(0);
- task = kthread_run(mdt_coordinator, cdt_mti, "hsm_cdtr");
+ /* Allocate the initial hsd.request[] vector*/
+ request_sz = cdt->cdt_max_requests * sizeof(struct hsm_scan_request);
+ OBD_ALLOC_LARGE(thread_data.request, request_sz);
+ if (!thread_data.request) {
+ set_cdt_state(cdt, CDT_STOPPED, NULL);
+ RETURN(-ENOMEM);
+ }
+
+ task = kthread_run(mdt_coordinator, &thread_data, "hsm_cdtr");
if (IS_ERR(task)) {
rc = PTR_ERR(task);
set_cdt_state(cdt, CDT_STOPPED, NULL);
if (IS_ERR(task)) {
rc = PTR_ERR(task);
set_cdt_state(cdt, CDT_STOPPED, NULL);
+ OBD_FREE(thread_data.request, request_sz);
CERROR("%s: error starting coordinator thread: %d\n",
mdt_obd_name(mdt), rc);
} else {
CERROR("%s: error starting coordinator thread: %d\n",
mdt_obd_name(mdt), rc);
} else {
wait_event(cdt->cdt_waitq,
cdt->cdt_state != CDT_INIT);
wait_event(cdt->cdt_waitq,
cdt->cdt_state != CDT_INIT);
- if (cdt->cdt_state == CDT_RUNNING) {
- CDEBUG(D_HSM, "%s: coordinator thread started\n",
- mdt_obd_name(mdt));
- rc = 0;
- } else {
- CDEBUG(D_HSM,
- "%s: coordinator thread failed to start\n",
- mdt_obd_name(mdt));
- rc = -EINVAL;
- }
+ CDEBUG(D_HSM, "%s: coordinator thread started\n",
+ mdt_obd_name(mdt));
+ rc = 0;
ENTRY;
/* stop coordinator thread */
rc = set_cdt_state(cdt, CDT_STOPPING, NULL);
ENTRY;
/* stop coordinator thread */
rc = set_cdt_state(cdt, CDT_STOPPING, NULL);
- if (rc != 0)
- RETURN(rc);
-
- wake_up_all(&cdt->cdt_waitq);
- wait_event(cdt->cdt_waitq, cdt->cdt_state != CDT_STOPPING);
+ if (rc == 0) {
+ kthread_stop(cdt->cdt_task);
+ cdt->cdt_task = NULL;
+ set_cdt_state(cdt, CDT_STOPPED, NULL);
+ }
rc = -EALREADY;
} else {
rc = mdt_hsm_cdt_stop(mdt);
rc = -EALREADY;
} else {
rc = mdt_hsm_cdt_stop(mdt);
- mdt_hsm_cdt_wakeup(mdt);
}
} else if (strcmp(kernbuf, CDT_DISABLE_CMD) == 0) {
if ((cdt->cdt_state == CDT_STOPPING) ||
}
} else if (strcmp(kernbuf, CDT_DISABLE_CMD) == 0) {
if ((cdt->cdt_state == CDT_STOPPING) ||
struct coordinator {
wait_queue_head_t cdt_waitq; /**< cdt wait queue */
bool cdt_event; /**< coordinator event */
struct coordinator {
wait_queue_head_t cdt_waitq; /**< cdt wait queue */
bool cdt_event; /**< coordinator event */
+ struct task_struct *cdt_task; /**< cdt thread handle */
struct lu_env cdt_env; /**< coordinator lustre
* env */
struct lu_context cdt_session; /** session for lu_ucred */
struct lu_env cdt_env; /**< coordinator lustre
* env */
struct lu_context cdt_session; /** session for lu_ucred */