struct coordinator *cdt = &mdt->mdt_coordinator;
ENTRY;
+ if (cdt->cdt_state == CDT_DISABLE)
+ RETURN(-ECANCELED);
+
larr = (struct llog_agent_req_rec *)hdr;
dump_llog_agent_req_rec("mdt_coordinator_cb(): ", larr);
switch (larr->arr_status) {
u32 start_rec_idx;
struct hsm_record_update *updates;
+ if (cdt->cdt_state == CDT_DISABLE) {
+ cdt->cdt_idle = true;
+ wake_up(&cdt->cdt_cancel_all);
+ }
/* Limit execution of the expensive requests traversal
* to at most one second. This prevents repeatedly
* locking/unlocking the catalog for each request
continue;
}
+ cdt->cdt_idle = false;
/* If no event, and no housekeeping to do, continue to
* wait. */
if (last_housekeeping + cdt->cdt_loop_period <=
hsd.hsd_one_restore = false;
rc = cdt_llog_process(mti->mti_env, mdt, mdt_coordinator_cb,
- &hsd, start_cat_idx, start_rec_idx,
- WRITE);
+ &hsd, start_cat_idx, start_rec_idx);
if (rc < 0)
goto clean_cb_alloc;
cdt->cdt_max_requests)
break;
+ /* if cancels happen during llog process or sending
+ * assumes that other records are cancelled
+ */
+ if (cdt->cdt_state == CDT_DISABLE)
+ goto update_recs;
+
+
rc = mdt_hsm_agent_send(mti, hal, 0);
/* if failure, we suppose it is temporary
* if the copy tool failed to do the request
}
}
+update_recs:
if (update_idx) {
rc = mdt_agent_record_update(mti, updates, update_idx);
if (rc)
larr = (struct llog_agent_req_rec *)hdr;
hai = &larr->arr_hai;
- if (hai->hai_cookie > cdt->cdt_last_cookie) {
+
+ if (hai->hai_cookie > atomic64_read(&cdt->cdt_last_cookie)) {
/* update the cookie to avoid collision */
- cdt->cdt_last_cookie = hai->hai_cookie;
+ atomic64_set(&cdt->cdt_last_cookie, hai->hai_cookie);
}
if (hai->hai_action != HSMA_RESTORE ||
hrd.hrd_mti = mti;
rc = cdt_llog_process(mti->mti_env, mti->mti_mdt, hsm_restore_cb, &hrd,
- 0, 0, WRITE);
+ 0, 0);
if (rc < 0)
RETURN(rc);
/* no pending request found -> start a new session */
- if (!cdt->cdt_last_cookie)
- cdt->cdt_last_cookie = ktime_get_real_seconds();
+ if (!atomic64_read(&cdt->cdt_last_cookie))
+ atomic64_set(&cdt->cdt_last_cookie, ktime_get_real_seconds());
RETURN(0);
}
ENTRY;
init_waitqueue_head(&cdt->cdt_waitq);
- init_rwsem(&cdt->cdt_llog_lock);
+ init_waitqueue_head(&cdt->cdt_cancel_all);
init_rwsem(&cdt->cdt_agent_lock);
init_rwsem(&cdt->cdt_request_lock);
mutex_init(&cdt->cdt_state_lock);
/* by default do not remove archives on last unlink */
cdt->cdt_remove_archive_on_last_unlink = false;
+ cdt->cdt_idle = true;
RETURN(0);
if (rc)
GOTO(out_cdt_state_unlock, rc);
+ /* waits while coordinator finish work */
+ if (wait_event_interruptible(cdt->cdt_cancel_all, cdt->cdt_idle))
+ GOTO(out_cdt_state, rc = -EINTR);
+
/* send cancel to all running requests */
down_read(&cdt->cdt_request_lock);
list_for_each_entry(car, &cdt->cdt_request_list, car_request_list) {
/* cancel all on-disk records */
rc = cdt_llog_process(mti->mti_env, mti->mti_mdt, mdt_cancel_all_cb,
- (void *)mti, 0, 0, WRITE);
+ (void *)mti, 0, 0);
out_cdt_state:
/* Enable coordinator, unless the coordinator was stopping. */
set_cdt_state_locked(cdt, old_state);
* \param mdt [IN] MDT device
* \param cb [IN] llog callback funtion
* \param data [IN] llog callback data
- * \param rw [IN] cdt_llog_lock mode (READ or WRITE)
* \param start_cat_idx first catalog index to examine
* \param start_rec_idx first record index to examine
* \retval 0 success
*/
int cdt_llog_process(const struct lu_env *env, struct mdt_device *mdt,
llog_cb_t cb, void *data, u32 start_cat_idx,
- u32 start_rec_idx, int rw)
+ u32 start_rec_idx)
{
struct obd_device *obd = mdt2obd_dev(mdt);
struct llog_ctxt *lctxt = NULL;
- struct coordinator *cdt = &mdt->mdt_coordinator;
int rc;
ENTRY;
if (lctxt == NULL || lctxt->loc_handle == NULL)
RETURN(-ENOENT);
- if (rw == READ)
- down_read(&cdt->cdt_llog_lock);
- else
- down_write(&cdt->cdt_llog_lock);
-
rc = llog_cat_process(env, lctxt->loc_handle, cb, data, start_cat_idx,
start_rec_idx);
if (rc < 0)
llog_ctxt_put(lctxt);
- if (rw == READ)
- up_read(&cdt->cdt_llog_lock);
- else
- up_write(&cdt->cdt_llog_lock);
-
RETURN(rc);
}
if (hai->hai_action == HSMA_CANCEL)
RETURN(0);
- if (hai->hai_cookie > cdt->cdt_last_cookie)
- cdt->cdt_last_cookie = hai->hai_cookie;
+ if (hai->hai_cookie > atomic64_read(&cdt->cdt_last_cookie))
+ atomic64_set(&cdt->cdt_last_cookie, hai->hai_cookie);
RETURN(LLOG_PROC_BREAK);
}
* \param mti [IN] context
*/
static int cdt_update_last_cookie(const struct lu_env *env,
+ struct llog_ctxt *lctxt,
struct coordinator *cdt)
-__must_hold(&cdt->cdt_llog_lock)
{
- struct mdt_device *mdt;
- struct obd_device *obd;
- struct llog_ctxt *lctxt;
int rc;
- mdt = container_of(cdt, typeof(*mdt), mdt_coordinator);
- obd = mdt2obd_dev(mdt);
- lctxt = llog_get_context(obd, LLOG_AGENT_ORIG_CTXT);
- if (!lctxt || !lctxt->loc_handle)
- RETURN(-ENOENT);
-
rc = llog_cat_reverse_process(env, lctxt->loc_handle,
hsm_last_cookie_cb, cdt);
- llog_ctxt_put(lctxt);
-
if (rc < 0) {
CERROR("%s: failed to process HSM_ACTIONS llog: rc = %d\n",
- mdt_obd_name(mdt), rc);
+ lctxt->loc_obd->obd_name, rc);
RETURN(rc);
}
/* no pending request found -> start a new session */
- if (!cdt->cdt_last_cookie)
- cdt->cdt_last_cookie = ktime_get_real_seconds();
+ if (!atomic64_read(&cdt->cdt_last_cookie))
+ atomic64_set(&cdt->cdt_last_cookie, ktime_get_real_seconds());
RETURN(0);
}
if (lctxt == NULL || lctxt->loc_handle == NULL)
GOTO(free, rc = -ENOENT);
- down_write(&cdt->cdt_llog_lock);
-
/* If cdt_last_cookie is not set, try to initialize it.
* This is used by RAoLU with non-started coordinator.
*/
- if (unlikely(!cdt->cdt_last_cookie)) {
- rc = cdt_update_last_cookie(env, cdt);
+ if (unlikely(!atomic64_read(&cdt->cdt_last_cookie))) {
+ rc = cdt_update_last_cookie(env, lctxt, cdt);
if (rc < 0)
- GOTO(unlock, rc);
+ GOTO(putctxt, rc);
}
/* in case of cancel request, the cookie is already set to the
if (hai->hai_action == HSMA_CANCEL)
larr->arr_hai.hai_cookie = hai->hai_cookie;
else
- larr->arr_hai.hai_cookie = ++cdt->cdt_last_cookie;
+ larr->arr_hai.hai_cookie =
+ atomic64_inc_return(&cdt->cdt_last_cookie);
rc = llog_cat_add(env, lctxt->loc_handle, &larr->arr_hdr, NULL);
if (rc > 0)
rc = 0;
-
-unlock:
- up_write(&cdt->cdt_llog_lock);
+putctxt:
llog_ctxt_put(lctxt);
EXIT;
ducb.change_time = ktime_get_real_seconds();
rc = cdt_llog_process(env, mdt, mdt_agent_record_update_cb, &ducb,
- start_cat_idx, start_rec_idx, WRITE);
+ start_cat_idx, start_rec_idx);
if (rc < 0)
CERROR("%s: cdt_llog_process() failed, rc=%d, cannot update "
"status for %u cookies, done %u\n",
static int mdt_hsm_actions_debugfs_show(struct seq_file *s, void *v)
{
struct agent_action_iterator *aai = s->private;
- struct coordinator *cdt = &aai->aai_mdt->mdt_coordinator;
int rc;
ENTRY;
if (aai->aai_eof)
RETURN(0);
- down_read(&cdt->cdt_llog_lock);
rc = llog_cat_process(&aai->aai_env, aai->aai_ctxt->loc_handle,
hsm_actions_show_cb, s,
aai->aai_cat_index, aai->aai_index);
- up_read(&cdt->cdt_llog_lock);
if (rc == 0) /* all llog parsed */
aai->aai_eof = true;
if (rc == LLOG_PROC_BREAK) /* buffer full */
if (check)
rc = cdt_llog_process(env, mdt, hsm_find_compatible_cb, hal, 0,
- 0, READ);
+ 0);
RETURN(rc);
}
ENTRY;
/* 1st we search in recorded requests */
- rc = cdt_llog_process(env, mdt, hsm_get_action_cb, &hgad, 0, 0, READ);
+ rc = cdt_llog_process(env, mdt, hsm_get_action_cb, &hgad, 0, 0);
if (rc < 0)
RETURN(rc);
}
/* when multiple lock are needed, the lock order is
- * cdt_llog_lock
* cdt_agent_lock
* cdt_counter_lock
* cdt_request_lock
struct coordinator {
refcount_t cdt_ref; /**< cdt refcount */
wait_queue_head_t cdt_waitq; /**< cdt wait queue */
+ wait_queue_head_t cdt_cancel_all; /**< cancel_all wait */
bool cdt_event; /**< coordinator event */
struct task_struct *cdt_task; /**< cdt thread handle */
struct lu_env cdt_env; /**< coordinator lustre
__u64 cdt_policy; /**< policy flags */
enum cdt_states cdt_state; /**< state */
struct mutex cdt_state_lock; /**< cdt_state lock */
- __u64 cdt_last_cookie; /**< last cookie
+ atomic64_t cdt_last_cookie; /**< last cookie
* allocated */
- struct rw_semaphore cdt_llog_lock; /**< protect llog
- * access */
struct rw_semaphore cdt_agent_lock; /**< protect agent list */
struct rw_semaphore cdt_request_lock; /**< protect request
* list */
bool cdt_remove_archive_on_last_unlink;
bool cdt_wakeup_coordinator;
+ bool cdt_idle;
};
/* mdt state flag bits */
const struct llog_agent_req_rec *larr);
int cdt_llog_process(const struct lu_env *env, struct mdt_device *mdt,
llog_cb_t cb, void *data, u32 start_cat_idx,
- u32 start_rec_idx, int rw);
+ u32 start_rec_idx);
int mdt_agent_record_add(const struct lu_env *env, struct mdt_device *mdt,
__u32 archive_id, __u64 flags,
struct hsm_action_item *hai);
lgi->lgi_off += sizeof(struct llog_rec_hdr);
lgi->lgi_buf.lb_len = REC_DATA_LEN(rec);
lgi->lgi_buf.lb_buf = REC_DATA(rec);
+
+ dt_write_lock(env, o, 0);
rc = dt_record_write(env, o, &lgi->lgi_buf, &lgi->lgi_off, th);
if (rc == 0 && reccookie) {
reccookie->lgc_lgl = loghandle->lgh_id;
reccookie->lgc_index = idx;
rc = 1;
}
+ dt_write_unlock(env, o);
+
RETURN(rc);
}