Whamcloud - gitweb
LU-10383 hsm: add action count to hsm scan data
[fs/lustre-release.git] / lustre / mdt / mdt_coordinator.c
index 0e5e8f4..b240afe 100644 (file)
@@ -136,244 +136,263 @@ struct hsm_scan_request {
 };
 
 struct hsm_scan_data {
-       struct mdt_thread_info          *mti;
-       char                             fs_name[MTI_NAME_MAXLEN+1];
+       struct mdt_thread_info  *hsd_mti;
+       char                     hsd_fsname[MTI_NAME_MAXLEN + 1];
        /* are we scanning the logs for housekeeping, or just looking
         * for new work?
         */
-       bool                             housekeeping;
-       /* request to be send to agents */
-       int                              max_requests;  /** vector size */
-       int                              request_cnt;   /** used count */
-       struct hsm_scan_request         *request;
+       bool                     hsd_housekeeping;
+       int                      hsd_action_count;
+       int                      hsd_request_len; /* array alloc len */
+       int                      hsd_request_count; /* array used count */
+       struct hsm_scan_request *hsd_request;
 };
 
-struct hsm_thread_data {
-       struct mdt_thread_info  *cdt_mti;
-       struct hsm_scan_request *request;
-};
-/**
- *  llog_cat_process() callback, used to:
- *  - find waiting request and start action
- *  - purge canceled and done requests
- * \param env [IN] environment
- * \param llh [IN] llog handle
- * \param hdr [IN] llog record
- * \param data [IN/OUT] cb data = struct hsm_scan_data
- * \retval 0 success
- * \retval -ve failure
- */
-static int mdt_coordinator_cb(const struct lu_env *env,
+static int mdt_cdt_waiting_cb(const struct lu_env *env,
+                             struct mdt_device *mdt,
                              struct llog_handle *llh,
-                             struct llog_rec_hdr *hdr,
-                             void *data)
+                             struct llog_agent_req_rec *larr,
+                             struct hsm_scan_data *hsd)
 {
-       struct llog_agent_req_rec       *larr;
-       struct hsm_scan_data            *hsd;
-       struct hsm_action_item          *hai;
-       struct mdt_device               *mdt;
-       struct coordinator              *cdt;
-       int                              rc;
-       ENTRY;
-
-       hsd = data;
-       mdt = hsd->mti->mti_mdt;
-       cdt = &mdt->mdt_coordinator;
+       struct coordinator *cdt = &mdt->mdt_coordinator;
+       struct hsm_scan_request *request;
+       struct hsm_action_item *hai;
+       int i;
 
-       larr = (struct llog_agent_req_rec *)hdr;
-       dump_llog_agent_req_rec("mdt_coordinator_cb(): ", larr);
-       switch (larr->arr_status) {
-       case ARS_WAITING: {
-               int i;
-               struct hsm_scan_request *request;
+       /* Are agents full? */
+       if (hsd->hsd_action_count + atomic_read(&cdt->cdt_request_count) >=
+           cdt->cdt_max_requests) {
+               if (hsd->hsd_housekeeping) {
+                       /* Unknown request and no more room for a new
+                        * request. Continue to scan to find other
+                        * entries for already existing requests. */
+                       RETURN(0);
+               } else {
+                       /* We cannot send and more requests, stop
+                        * here. There might be more known requests
+                        * that could be merged, but this avoid
+                        * analyzing too many llogs for minor
+                        * gains. */
+                       RETURN(LLOG_PROC_BREAK);
+               }
+       }
 
-               /* Are agents full? */
-               if (atomic_read(&cdt->cdt_request_count) >=
-                   cdt->cdt_max_requests)
+       /* first search whether the request is found in the list we
+        * have built. */
+       request = NULL;
+       for (i = 0; i < hsd->hsd_request_count; i++) {
+               if (hsd->hsd_request[i].hal->hal_compound_id ==
+                   larr->arr_compound_id) {
+                       request = &hsd->hsd_request[i];
                        break;
+               }
+       }
 
-               /* first search whether the request is found in the
-                * list we have built. */
-               request = NULL;
-               for (i = 0; i < hsd->request_cnt; i++) {
-                       if (hsd->request[i].hal->hal_compound_id ==
-                           larr->arr_compound_id) {
-                               request = &hsd->request[i];
-                               break;
-                       }
+       if (!request) {
+               struct hsm_action_list *hal;
+
+               if (hsd->hsd_request_count == hsd->hsd_request_len) {
+                       /* Logic as above. */
+                       if (hsd->hsd_housekeeping)
+                               RETURN(0);
+                       else
+                               RETURN(LLOG_PROC_BREAK);
                }
 
-               if (!request) {
-                       struct hsm_action_list *hal;
-
-                       if (hsd->request_cnt == hsd->max_requests) {
-                               if (!hsd->housekeeping) {
-                                       /* The request array is full,
-                                        * stop here. There might be
-                                        * more known requests that
-                                        * could be merged, but this
-                                        * avoid analyzing too many
-                                        * llogs for minor gains.
-                                        */
-                                       RETURN(LLOG_PROC_BREAK);
-                               } else {
-                                       /* Unknown request and no more room
-                                        * for a new request. Continue to scan
-                                        * to find other entries for already
-                                        * existing requests.
-                                        */
-                                       RETURN(0);
-                               }
-                       }
+               request = &hsd->hsd_request[hsd->hsd_request_count];
 
-                       request = &hsd->request[hsd->request_cnt];
+               /* allocates hai vector size just needs to be large
+                * enough */
+               request->hal_sz = sizeof(*request->hal) +
+                       cfs_size_round(MTI_NAME_MAXLEN + 1) +
+                       2 * cfs_size_round(larr->arr_hai.hai_len);
+               OBD_ALLOC(hal, request->hal_sz);
+               if (!hal)
+                       RETURN(-ENOMEM);
 
-                       /* allocates hai vector size just needs to be large
-                        * enough */
-                       request->hal_sz =
-                               sizeof(*request->hal) +
-                               cfs_size_round(MTI_NAME_MAXLEN+1) +
-                               2 * cfs_size_round(larr->arr_hai.hai_len);
-                       OBD_ALLOC(hal, request->hal_sz);
-                       if (!hal)
-                               RETURN(-ENOMEM);
-                       hal->hal_version = HAL_VERSION;
-                       strlcpy(hal->hal_fsname, hsd->fs_name,
-                               MTI_NAME_MAXLEN + 1);
-                       hal->hal_compound_id = larr->arr_compound_id;
-                       hal->hal_archive_id = larr->arr_archive_id;
-                       hal->hal_flags = larr->arr_flags;
-                       hal->hal_count = 0;
-                       request->hal_used_sz = hal_size(hal);
-                       request->hal = hal;
-                       hsd->request_cnt++;
-                       hai = hai_first(hal);
-               } else {
-                       /* request is known */
-                       /* we check if record archive num is the same as the
-                        * known request, if not we will serve it in multiple
-                        * time because we do not know if the agent can serve
-                        * multiple backend
-                        * a use case is a compound made of multiple restore
-                        * where the files are not archived in the same backend
-                        */
-                       if (larr->arr_archive_id !=
-                           request->hal->hal_archive_id)
-                               RETURN(0);
+               hal->hal_version = HAL_VERSION;
+               strlcpy(hal->hal_fsname, hsd->hsd_fsname, MTI_NAME_MAXLEN + 1);
+               hal->hal_compound_id = larr->arr_compound_id;
+               hal->hal_archive_id = larr->arr_archive_id;
+               hal->hal_flags = larr->arr_flags;
+               hal->hal_count = 0;
+               request->hal_used_sz = hal_size(hal);
+               request->hal = hal;
+               hsd->hsd_request_count++;
+               hai = hai_first(hal);
+       } else {
+               /* request is known */
+               /* we check if record archive num is the same as the
+                * known request, if not we will serve it in multiple
+                * time because we do not know if the agent can serve
+                * multiple backend a use case is a compound made of
+                * multiple restore where the files are not archived
+                * in the same backend */
+               if (larr->arr_archive_id != request->hal->hal_archive_id)
+                       RETURN(0);
 
-                       if (request->hal_sz <
-                           request->hal_used_sz +
-                           cfs_size_round(larr->arr_hai.hai_len)) {
-                               /* Not enough room, need an extension */
-                               void *hal_buffer;
-                               int sz;
-
-                               sz = 2 * request->hal_sz;
-                               OBD_ALLOC(hal_buffer, sz);
-                               if (!hal_buffer)
-                                       RETURN(-ENOMEM);
-                               memcpy(hal_buffer, request->hal,
-                                      request->hal_used_sz);
-                               OBD_FREE(request->hal,
-                                        request->hal_sz);
-                               request->hal = hal_buffer;
-                               request->hal_sz = sz;
-                       }
-                       hai = hai_first(request->hal);
-                       for (i = 0; i < request->hal->hal_count; i++)
-                               hai = hai_next(hai);
-               }
-               memcpy(hai, &larr->arr_hai, larr->arr_hai.hai_len);
-               hai->hai_cookie = larr->arr_hai.hai_cookie;
-               hai->hai_gid = larr->arr_hai.hai_gid;
+               if (request->hal_sz < request->hal_used_sz +
+                   cfs_size_round(larr->arr_hai.hai_len)) {
+                       /* Not enough room, need an extension */
+                       void *hal_buffer;
+                       int sz;
 
-               request->hal_used_sz += cfs_size_round(hai->hai_len);
-               request->hal->hal_count++;
+                       sz = 2 * request->hal_sz;
+                       OBD_ALLOC(hal_buffer, sz);
+                       if (!hal_buffer)
+                               RETURN(-ENOMEM);
+                       memcpy(hal_buffer, request->hal, request->hal_used_sz);
+                       OBD_FREE(request->hal, request->hal_sz);
+                       request->hal = hal_buffer;
+                       request->hal_sz = sz;
+               }
 
-               if (hai->hai_action != HSMA_CANCEL)
-                       cdt_agent_record_hash_add(cdt, hai->hai_cookie,
-                                                 llh->lgh_hdr->llh_cat_idx,
-                                                 hdr->lrh_index);
-               break;
+               hai = hai_first(request->hal);
+               for (i = 0; i < request->hal->hal_count; i++)
+                       hai = hai_next(hai);
        }
-       case ARS_STARTED: {
-               struct hsm_progress_kernel pgs;
-               struct cdt_agent_req *car;
-               time64_t now = ktime_get_real_seconds();
-               time64_t last;
 
-               if (!hsd->housekeeping)
-                       break;
+       memcpy(hai, &larr->arr_hai, larr->arr_hai.hai_len);
+       hai->hai_cookie = larr->arr_hai.hai_cookie;
+       hai->hai_gid = larr->arr_hai.hai_gid;
 
-               /* we search for a running request
-                * error may happen if coordinator crashes or stopped
-                * with running request
-                */
-               car = mdt_cdt_find_request(cdt, larr->arr_hai.hai_cookie);
-               if (car == NULL) {
-                       last = larr->arr_req_change;
-               } else {
-                       last = car->car_req_update;
-                       mdt_cdt_put_request(car);
-               }
+       request->hal_used_sz += cfs_size_round(hai->hai_len);
+       request->hal->hal_count++;
 
-               /* test if request too long, if yes cancel it
-                * the same way the copy tool acknowledge a cancel request */
-               if (now <= last + cdt->cdt_active_req_timeout)
-                       RETURN(0);
+       hsd->hsd_action_count++;
 
-               dump_llog_agent_req_rec("request timed out, start cleaning",
-                                       larr);
-               /* a too old cancel request just needs to be removed
-                * this can happen, if copy tool does not support
-                * cancel for other requests, we have to remove the
-                * running request and notify the copytool */
-               pgs.hpk_fid = larr->arr_hai.hai_fid;
-               pgs.hpk_cookie = larr->arr_hai.hai_cookie;
-               pgs.hpk_extent = larr->arr_hai.hai_extent;
-               pgs.hpk_flags = HP_FLAG_COMPLETED;
-               pgs.hpk_errval = ENOSYS;
-               pgs.hpk_data_version = 0;
-
-               /* update request state, but do not record in llog, to
-                * avoid deadlock on cdt_llog_lock */
-               rc = mdt_hsm_update_request_state(hsd->mti, &pgs, 0);
-               if (rc)
-                       CERROR("%s: cannot cleanup timed out request: "
-                              DFID" for cookie %#llx action=%s\n",
-                              mdt_obd_name(mdt),
-                              PFID(&pgs.hpk_fid), pgs.hpk_cookie,
-                              hsm_copytool_action2name(
-                                      larr->arr_hai.hai_action));
-
-               if (rc == -ENOENT) {
-                       /* The request no longer exists, forget
-                        * about it, and do not send a cancel request
-                        * to the client, for which an error will be
-                        * sent back, leading to an endless cycle of
-                        * cancellation. */
-                       cdt_agent_record_hash_del(cdt,
-                                                 larr->arr_hai.hai_cookie);
-                       RETURN(LLOG_DEL_RECORD);
-               }
+       if (hai->hai_action != HSMA_CANCEL)
+               cdt_agent_record_hash_add(cdt, hai->hai_cookie,
+                                         llh->lgh_hdr->llh_cat_idx,
+                                         larr->arr_hdr.lrh_index);
 
-               /* XXX A cancel request cannot be cancelled. */
-               if (larr->arr_hai.hai_action == HSMA_CANCEL)
-                       RETURN(0);
+       RETURN(0);
+}
 
-               larr->arr_status = ARS_CANCELED;
-               larr->arr_req_change = now;
-               rc = llog_write(hsd->mti->mti_env, llh, hdr, hdr->lrh_index);
-               if (rc < 0)
-                       CERROR("%s: cannot update agent log: rc = %d\n",
-                              mdt_obd_name(mdt), rc);
+static int mdt_cdt_started_cb(const struct lu_env *env,
+                             struct mdt_device *mdt,
+                             struct llog_handle *llh,
+                             struct llog_agent_req_rec *larr,
+                             struct hsm_scan_data *hsd)
+{
+       struct coordinator *cdt = &mdt->mdt_coordinator;
+       struct hsm_action_item *hai = &larr->arr_hai;
+       struct cdt_agent_req *car;
+       time64_t now = ktime_get_real_seconds();
+       time64_t last;
+       int cl_flags;
+       int rc;
+
+       if (!hsd->hsd_housekeeping)
+               RETURN(0);
+
+       /* we search for a running request
+        * error may happen if coordinator crashes or stopped
+        * with running request
+        */
+       car = mdt_cdt_find_request(cdt, hai->hai_cookie);
+       if (car == NULL) {
+               last = larr->arr_req_change;
+       } else {
+               last = car->car_req_update;
+       }
+
+       /* test if request too long, if yes cancel it
+        * the same way the copy tool acknowledge a cancel request */
+       if (now <= last + cdt->cdt_active_req_timeout)
+               GOTO(out_car, rc = 0);
+
+       dump_llog_agent_req_rec("request timed out, start cleaning", larr);
+
+       if (car != NULL) {
+               car->car_req_update = now;
+               mdt_hsm_agent_update_statistics(cdt, 0, 1, 0, &car->car_uuid);
+               /* Remove car from memory list (LU-9075) */
+               mdt_cdt_remove_request(cdt, hai->hai_cookie);
+       }
+
+       /* Emit a changelog record for the failed action.*/
+       cl_flags = 0;
+       hsm_set_cl_error(&cl_flags, ECANCELED);
+
+       switch (hai->hai_action) {
+       case HSMA_ARCHIVE:
+               hsm_set_cl_event(&cl_flags, HE_ARCHIVE);
+               break;
+       case HSMA_RESTORE:
+               hsm_set_cl_event(&cl_flags, HE_RESTORE);
+               break;
+       case HSMA_REMOVE:
+               hsm_set_cl_event(&cl_flags, HE_REMOVE);
+               break;
+       case HSMA_CANCEL:
+               hsm_set_cl_event(&cl_flags, HE_CANCEL);
+               break;
+       default:
+               /* Unknown record type, skip changelog. */
+               cl_flags = 0;
                break;
        }
-       case ARS_FAILED:
-       case ARS_CANCELED:
-       case ARS_SUCCEED:
-               if (!hsd->housekeeping)
-                       break;
+
+       if (cl_flags != 0)
+               mo_changelog(env, CL_HSM, cl_flags, mdt->mdt_child,
+                            &hai->hai_fid);
+
+       if (hai->hai_action == HSMA_RESTORE)
+               cdt_restore_handle_del(hsd->hsd_mti, cdt, &hai->hai_fid);
+
+       larr->arr_status = ARS_CANCELED;
+       larr->arr_req_change = now;
+       rc = llog_write(hsd->hsd_mti->mti_env, llh, &larr->arr_hdr,
+                       larr->arr_hdr.lrh_index);
+       if (rc < 0) {
+               CERROR("%s: cannot update agent log: rc = %d\n",
+                      mdt_obd_name(mdt), rc);
+               rc = LLOG_DEL_RECORD;
+       }
+
+       /* ct has completed a request, so a slot is available,
+        * signal the coordinator to find new work */
+       mdt_hsm_cdt_event(cdt);
+out_car:
+       if (car != NULL)
+               mdt_cdt_put_request(car);
+
+       RETURN(rc);
+}
+
+/**
+ *  llog_cat_process() callback, used to:
+ *  - find waiting request and start action
+ *  - purge canceled and done requests
+ * \param env [IN] environment
+ * \param llh [IN] llog handle
+ * \param hdr [IN] llog record
+ * \param data [IN/OUT] cb data = struct hsm_scan_data
+ * \retval 0 success
+ * \retval -ve failure
+ */
+static int mdt_coordinator_cb(const struct lu_env *env,
+                             struct llog_handle *llh,
+                             struct llog_rec_hdr *hdr,
+                             void *data)
+{
+       struct llog_agent_req_rec *larr = (struct llog_agent_req_rec *)hdr;
+       struct hsm_scan_data *hsd = data;
+       struct mdt_device *mdt = hsd->hsd_mti->mti_mdt;
+       struct coordinator *cdt = &mdt->mdt_coordinator;
+       ENTRY;
+
+       larr = (struct llog_agent_req_rec *)hdr;
+       dump_llog_agent_req_rec("mdt_coordinator_cb(): ", larr);
+       switch (larr->arr_status) {
+       case ARS_WAITING:
+               RETURN(mdt_cdt_waiting_cb(env, mdt, llh, larr, hsd));
+       case ARS_STARTED:
+               RETURN(mdt_cdt_started_cb(env, mdt, llh, larr, hsd));
+       default:
+               if (!hsd->hsd_housekeeping)
+                       RETURN(0);
 
                if ((larr->arr_req_change + cdt->cdt_grace_delay) <
                    ktime_get_real_seconds()) {
@@ -381,9 +400,9 @@ static int mdt_coordinator_cb(const struct lu_env *env,
                                                  larr->arr_hai.hai_cookie);
                        RETURN(LLOG_DEL_RECORD);
                }
-               break;
+
+               RETURN(0);
        }
-       RETURN(0);
 }
 
 /**
@@ -466,7 +485,8 @@ static void mdt_hsm_cdt_cleanup(struct mdt_device *mdt)
 
        cdt_mti = lu_context_key_get(&cdt->cdt_env.le_ctx, &mdt_thread_key);
        mutex_lock(&cdt->cdt_restore_lock);
-       list_for_each_entry_safe(crh, tmp3, &cdt->cdt_restore_hdl, crh_list) {
+       list_for_each_entry_safe(crh, tmp3, &cdt->cdt_restore_handle_list,
+                                crh_list) {
                list_del(&crh->crh_list);
                /* give back layout lock */
                mdt_object_unlock(cdt_mti, NULL, &crh->crh_lh, 1);
@@ -496,24 +516,18 @@ static bool cdt_transition[CDT_STATES_COUNT][CDT_STATES_COUNT] = {
  * Returns 0 on success, with old_state set if not NULL, or -EINVAL if
  * the transition was not possible.
  */
-static int set_cdt_state(struct coordinator *cdt, enum cdt_states new_state,
-                        enum cdt_states *old_state)
+static int set_cdt_state_locked(struct coordinator *cdt,
+                               enum cdt_states new_state)
 {
        int rc;
        enum cdt_states state;
 
-       spin_lock(&cdt->cdt_state_lock);
-
        state = cdt->cdt_state;
 
        if (cdt_transition[state][new_state]) {
                cdt->cdt_state = new_state;
-               spin_unlock(&cdt->cdt_state_lock);
-               if (old_state)
-                       *old_state = state;
                rc = 0;
        } else {
-               spin_unlock(&cdt->cdt_state_lock);
                CDEBUG(D_HSM,
                       "unexpected coordinator transition, from=%s, to=%s\n",
                       cdt_mdt_state2str(state), cdt_mdt_state2str(new_state));
@@ -523,6 +537,19 @@ static int set_cdt_state(struct coordinator *cdt, enum cdt_states new_state,
        return rc;
 }
 
+static int set_cdt_state(struct coordinator *cdt, enum cdt_states new_state)
+{
+       int rc;
+
+       mutex_lock(&cdt->cdt_state_lock);
+       rc = set_cdt_state_locked(cdt, new_state);
+       mutex_unlock(&cdt->cdt_state_lock);
+
+       return rc;
+}
+
+
+
 /**
  * coordinator thread
  * \param data [IN] obd device
@@ -531,28 +558,23 @@ static int set_cdt_state(struct coordinator *cdt, enum cdt_states new_state,
  */
 static int mdt_coordinator(void *data)
 {
-       struct hsm_thread_data  *thread_data = data;
-       struct mdt_thread_info  *mti = thread_data->cdt_mti;
+       struct mdt_thread_info  *mti = data;
        struct mdt_device       *mdt = mti->mti_mdt;
        struct coordinator      *cdt = &mdt->mdt_coordinator;
        struct hsm_scan_data     hsd = { NULL };
        time64_t                 last_housekeeping = 0;
-       int                      rc = 0;
-       int                      request_sz;
+       size_t request_sz = 0;
+       int rc;
        ENTRY;
 
-       /* set up hsd->request and max_requests */
-       hsd.max_requests = cdt->cdt_max_requests;
-       request_sz = hsd.max_requests * sizeof(*hsd.request);
-       hsd.request = thread_data->request;
-
        CDEBUG(D_HSM, "%s: coordinator thread starting, pid=%d\n",
               mdt_obd_name(mdt), current_pid());
 
-       hsd.mti = mti;
-       obd_uuid2fsname(hsd.fs_name, mdt_obd_name(mdt), MTI_NAME_MAXLEN);
+       hsd.hsd_mti = mti;
+       obd_uuid2fsname(hsd.hsd_fsname, mdt_obd_name(mdt),
+                       sizeof(hsd.hsd_fsname));
 
-       set_cdt_state(cdt, CDT_RUNNING, NULL);
+       set_cdt_state(cdt, CDT_RUNNING);
 
        /* Inform mdt_hsm_cdt_start(). */
        wake_up_all(&cdt->cdt_waitq);
@@ -594,9 +616,9 @@ static int mdt_coordinator(void *data)
                if (last_housekeeping + cdt->cdt_loop_period <=
                    ktime_get_real_seconds()) {
                        last_housekeeping = ktime_get_real_seconds();
-                       hsd.housekeeping = true;
+                       hsd.hsd_housekeeping = true;
                } else if (cdt->cdt_event) {
-                       hsd.housekeeping = false;
+                       hsd.hsd_housekeeping = false;
                } else {
                        continue;
                }
@@ -605,7 +627,7 @@ static int mdt_coordinator(void *data)
 
                CDEBUG(D_HSM, "coordinator starts reading llog\n");
 
-               if (hsd.max_requests != cdt->cdt_max_requests) {
+               if (hsd.hsd_request_len != cdt->cdt_max_requests) {
                        /* cdt_max_requests has changed,
                         * we need to allocate a new buffer
                         */
@@ -616,25 +638,29 @@ static int mdt_coordinator(void *data)
                        if (!tmp) {
                                CERROR("Failed to resize request buffer, "
                                       "keeping it at %d\n",
-                                      hsd.max_requests);
-                               cdt->cdt_max_requests = hsd.max_requests;
+                                      hsd.hsd_request_len);
                        } else {
-                               OBD_FREE_LARGE(hsd.request, request_sz);
-                               hsd.max_requests = max_requests;
-                               request_sz = hsd.max_requests *
+                               if (hsd.hsd_request != NULL)
+                                       OBD_FREE_LARGE(hsd.hsd_request,
+                                                      request_sz);
+
+                               hsd.hsd_request_len = max_requests;
+                               request_sz = hsd.hsd_request_len *
                                        sizeof(struct hsm_scan_request);
-                               hsd.request = tmp;
+                               hsd.hsd_request = tmp;
                        }
                }
 
-               hsd.request_cnt = 0;
+               hsd.hsd_action_count = 0;
+               hsd.hsd_request_count = 0;
 
                rc = cdt_llog_process(mti->mti_env, mdt, mdt_coordinator_cb,
                                      &hsd, 0, 0, WRITE);
                if (rc < 0)
                        goto clean_cb_alloc;
 
-               CDEBUG(D_HSM, "found %d requests to send\n", hsd.request_cnt);
+               CDEBUG(D_HSM, "found %d requests to send\n",
+                      hsd.hsd_request_count);
 
                if (list_empty(&cdt->cdt_agents)) {
                        CDEBUG(D_HSM, "no agent available, "
@@ -644,9 +670,9 @@ static int mdt_coordinator(void *data)
 
                /* Compute how many HAI we have in all the requests */
                updates_cnt = 0;
-               for (i = 0; i < hsd.request_cnt; i++) {
+               for (i = 0; i < hsd.hsd_request_count; i++) {
                        const struct hsm_scan_request *request =
-                               &hsd.request[i];
+                               &hsd.hsd_request[i];
 
                        updates_cnt += request->hal->hal_count;
                }
@@ -663,8 +689,8 @@ static int mdt_coordinator(void *data)
                }
 
                /* here hsd contains a list of requests to be started */
-               for (i = 0; i < hsd.request_cnt; i++) {
-                       struct hsm_scan_request *request = &hsd.request[i];
+               for (i = 0; i < hsd.hsd_request_count; i++) {
+                       struct hsm_scan_request *request = &hsd.hsd_request[i];
                        struct hsm_action_list  *hal = request->hal;
                        struct hsm_action_item  *hai;
                        int                      j;
@@ -707,15 +733,15 @@ static int mdt_coordinator(void *data)
 
 clean_cb_alloc:
                /* free hal allocated by callback */
-               for (i = 0; i < hsd.request_cnt; i++) {
-                       struct hsm_scan_request *request = &hsd.request[i];
+               for (i = 0; i < hsd.hsd_request_count; i++) {
+                       struct hsm_scan_request *request = &hsd.hsd_request[i];
 
                        OBD_FREE(request->hal, request->hal_sz);
                }
        }
 
-       if (hsd.request)
-               OBD_FREE_LARGE(hsd.request, request_sz);
+       if (hsd.hsd_request != NULL)
+               OBD_FREE_LARGE(hsd.hsd_request, request_sz);
 
        mdt_hsm_cdt_cleanup(mdt);
 
@@ -730,6 +756,56 @@ clean_cb_alloc:
        RETURN(rc);
 }
 
+int cdt_restore_handle_add(struct mdt_thread_info *mti, struct coordinator *cdt,
+                          const struct lu_fid *fid,
+                          const struct hsm_extent *he)
+{
+       struct cdt_restore_handle *crh;
+       struct mdt_object *obj;
+       int rc;
+       ENTRY;
+
+       OBD_SLAB_ALLOC_PTR(crh, mdt_hsm_cdt_kmem);
+       if (crh == NULL)
+               RETURN(-ENOMEM);
+
+       crh->crh_fid = *fid;
+       /* in V1 all file is restored
+        * crh->extent.start = he->offset;
+        * crh->extent.end = he->offset + he->length;
+        */
+       crh->crh_extent.start = 0;
+       crh->crh_extent.end = he->length;
+       /* get the layout lock */
+       mdt_lock_reg_init(&crh->crh_lh, LCK_EX);
+       obj = mdt_object_find_lock(mti, &crh->crh_fid, &crh->crh_lh,
+                                  MDS_INODELOCK_LAYOUT);
+       if (IS_ERR(obj))
+               GOTO(out_crh, rc = PTR_ERR(obj));
+
+       /* We do not keep a reference on the object during the restore
+        * which can be very long. */
+       mdt_object_put(mti->mti_env, obj);
+
+       mutex_lock(&cdt->cdt_restore_lock);
+       if (unlikely(cdt->cdt_state == CDT_STOPPED ||
+                    cdt->cdt_state == CDT_STOPPING)) {
+               mutex_unlock(&cdt->cdt_restore_lock);
+               GOTO(out_lh, rc = -EAGAIN);
+       }
+
+       list_add_tail(&crh->crh_list, &cdt->cdt_restore_handle_list);
+       mutex_unlock(&cdt->cdt_restore_lock);
+
+       RETURN(0);
+out_lh:
+       mdt_object_unlock(mti, NULL, &crh->crh_lh, 1);
+out_crh:
+       OBD_SLAB_FREE_PTR(crh, mdt_hsm_cdt_kmem);
+
+       return rc;
+}
+
 /**
  * lookup a restore handle by FID
  * caller needs to hold cdt_restore_lock
@@ -738,19 +814,41 @@ clean_cb_alloc:
  * \retval cdt_restore_handle found
  * \retval NULL not found
  */
-struct cdt_restore_handle *mdt_hsm_restore_hdl_find(struct coordinator *cdt,
-                                                      const struct lu_fid *fid)
+struct cdt_restore_handle *cdt_restore_handle_find(struct coordinator *cdt,
+                                                  const struct lu_fid *fid)
 {
-       struct cdt_restore_handle       *crh;
+       struct cdt_restore_handle *crh;
        ENTRY;
 
-       list_for_each_entry(crh, &cdt->cdt_restore_hdl, crh_list) {
+       list_for_each_entry(crh, &cdt->cdt_restore_handle_list, crh_list) {
                if (lu_fid_eq(&crh->crh_fid, fid))
                        RETURN(crh);
        }
+
        RETURN(NULL);
 }
 
+void cdt_restore_handle_del(struct mdt_thread_info *mti,
+                           struct coordinator *cdt, const struct lu_fid *fid)
+{
+       struct cdt_restore_handle *crh;
+
+       /* give back layout lock */
+       mutex_lock(&cdt->cdt_restore_lock);
+       crh = cdt_restore_handle_find(cdt, fid);
+       if (crh != NULL)
+               list_del(&crh->crh_list);
+       mutex_unlock(&cdt->cdt_restore_lock);
+
+       if (crh == NULL)
+               return;
+
+       /* XXX We pass a NULL object since the restore handle does not
+        * keep a reference on the object being restored. */
+       mdt_object_unlock(mti, NULL, &crh->crh_lh, 1);
+       OBD_SLAB_FREE_PTR(crh, mdt_hsm_cdt_kmem);
+}
+
 /**
  * data passed to llog_cat_process() callback
  * to scan requests and take actions
@@ -775,11 +873,9 @@ static int hsm_restore_cb(const struct lu_env *env,
 {
        struct llog_agent_req_rec       *larr;
        struct hsm_restore_data         *hrd;
-       struct cdt_restore_handle       *crh;
        struct hsm_action_item          *hai;
        struct mdt_thread_info          *mti;
        struct coordinator              *cdt;
-       struct mdt_object               *child;
        int rc;
        ENTRY;
 
@@ -810,33 +906,7 @@ static int hsm_restore_cb(const struct lu_env *env,
                        GOTO(out, rc);
        }
 
-       OBD_SLAB_ALLOC_PTR(crh, mdt_hsm_cdt_kmem);
-       if (crh == NULL)
-               RETURN(-ENOMEM);
-
-       crh->crh_fid = hai->hai_fid;
-       /* in V1 all file is restored
-       crh->extent.start = hai->hai_extent.offset;
-       crh->extent.end = hai->hai_extent.offset + hai->hai_extent.length;
-       */
-       crh->crh_extent.start = 0;
-       crh->crh_extent.end = hai->hai_extent.length;
-       /* get the layout lock */
-       mdt_lock_reg_init(&crh->crh_lh, LCK_EX);
-       child = mdt_object_find_lock(mti, &crh->crh_fid, &crh->crh_lh,
-                                    MDS_INODELOCK_LAYOUT);
-       if (IS_ERR(child))
-               GOTO(out, rc = PTR_ERR(child));
-
-       rc = 0;
-       /* we choose to not keep a reference
-        * on the object during the restore time which can be very long */
-       mdt_object_put(mti->mti_env, child);
-
-       mutex_lock(&cdt->cdt_restore_lock);
-       list_add_tail(&crh->crh_list, &cdt->cdt_restore_hdl);
-       mutex_unlock(&cdt->cdt_restore_lock);
-
+       rc = cdt_restore_handle_add(mti, cdt, &hai->hai_fid, &hai->hai_extent);
 out:
        RETURN(rc);
 }
@@ -879,6 +949,8 @@ static int hsm_init_ucred(struct lu_ucred *uc)
        uc->uc_umask = 0777;
        uc->uc_ginfo = NULL;
        uc->uc_identity = NULL;
+       /* always record internal HSM activity if also enabled globally */
+       uc->uc_enable_audit = 1;
 
        RETURN(0);
 }
@@ -901,12 +973,12 @@ int mdt_hsm_cdt_init(struct mdt_device *mdt)
        init_rwsem(&cdt->cdt_agent_lock);
        init_rwsem(&cdt->cdt_request_lock);
        mutex_init(&cdt->cdt_restore_lock);
-       spin_lock_init(&cdt->cdt_state_lock);
-       set_cdt_state(cdt, CDT_STOPPED, NULL);
+       mutex_init(&cdt->cdt_state_lock);
+       set_cdt_state(cdt, CDT_STOPPED);
 
        INIT_LIST_HEAD(&cdt->cdt_request_list);
        INIT_LIST_HEAD(&cdt->cdt_agents);
-       INIT_LIST_HEAD(&cdt->cdt_restore_hdl);
+       INIT_LIST_HEAD(&cdt->cdt_restore_handle_list);
 
        cdt->cdt_request_cookie_hash = cfs_hash_create("REQUEST_COOKIE_HASH",
                                                       CFS_HASH_BITS_MIN,
@@ -1014,11 +1086,10 @@ int  mdt_hsm_cdt_fini(struct mdt_device *mdt)
 static int mdt_hsm_cdt_start(struct mdt_device *mdt)
 {
        struct coordinator      *cdt = &mdt->mdt_coordinator;
+       struct mdt_thread_info *cdt_mti;
        int                      rc;
        void                    *ptr;
        struct task_struct      *task;
-       int                      request_sz;
-       struct hsm_thread_data   thread_data;
        ENTRY;
 
        /* functions defined but not yet used
@@ -1026,7 +1097,7 @@ static int mdt_hsm_cdt_start(struct mdt_device *mdt)
         */
        ptr = dump_requests;
 
-       rc = set_cdt_state(cdt, CDT_INIT, NULL);
+       rc = set_cdt_state(cdt, CDT_INIT);
        if (rc) {
                CERROR("%s: Coordinator already started or stopping\n",
                       mdt_obd_name(mdt));
@@ -1051,9 +1122,8 @@ static int mdt_hsm_cdt_start(struct mdt_device *mdt)
         * /proc entries are created by the coordinator thread */
 
        /* set up list of started restore requests */
-       thread_data.cdt_mti =
-               lu_context_key_get(&cdt->cdt_env.le_ctx, &mdt_thread_key);
-       rc = mdt_hsm_pending_restore(thread_data.cdt_mti);
+       cdt_mti = lu_context_key_get(&cdt->cdt_env.le_ctx, &mdt_thread_key);
+       rc = mdt_hsm_pending_restore(cdt_mti);
        if (rc)
                CERROR("%s: cannot take the layout locks needed"
                       " for registered restore: %d\n",
@@ -1062,19 +1132,10 @@ static int mdt_hsm_cdt_start(struct mdt_device *mdt)
        if (mdt->mdt_bottom->dd_rdonly)
                RETURN(0);
 
-       /* Allocate the initial hsd.request[] vector*/
-       request_sz = cdt->cdt_max_requests * sizeof(struct hsm_scan_request);
-       OBD_ALLOC_LARGE(thread_data.request, request_sz);
-       if (!thread_data.request) {
-               set_cdt_state(cdt, CDT_STOPPED, NULL);
-               RETURN(-ENOMEM);
-       }
-
-       task = kthread_run(mdt_coordinator, &thread_data, "hsm_cdtr");
+       task = kthread_run(mdt_coordinator, cdt_mti, "hsm_cdtr");
        if (IS_ERR(task)) {
                rc = PTR_ERR(task);
-               set_cdt_state(cdt, CDT_STOPPED, NULL);
-               OBD_FREE(thread_data.request, request_sz);
+               set_cdt_state(cdt, CDT_STOPPED);
                CERROR("%s: error starting coordinator thread: %d\n",
                       mdt_obd_name(mdt), rc);
        } else {
@@ -1100,11 +1161,11 @@ int mdt_hsm_cdt_stop(struct mdt_device *mdt)
 
        ENTRY;
        /* stop coordinator thread */
-       rc = set_cdt_state(cdt, CDT_STOPPING, NULL);
+       rc = set_cdt_state(cdt, CDT_STOPPING);
        if (rc == 0) {
                kthread_stop(cdt->cdt_task);
                cdt->cdt_task = NULL;
-               set_cdt_state(cdt, CDT_STOPPED, NULL);
+               set_cdt_state(cdt, CDT_STOPPED);
        }
 
        RETURN(rc);
@@ -1282,7 +1343,6 @@ out:
  * update status of a completed request
  * \param mti [IN] context
  * \param pgs [IN] progress of the copy tool
- * \param update_record [IN] update llog record
  * \retval 0 success
  * \retval -ve failure
  */
@@ -1448,8 +1508,6 @@ static int hsm_cdt_request_completed(struct mdt_thread_info *mti,
         * if no retry will be attempted and if object is still alive,
         * in other cases we just unlock the object */
        if (car->car_hai->hai_action == HSMA_RESTORE) {
-               struct cdt_restore_handle       *crh;
-
                /* restore in data FID done, we swap the layouts
                 * only if restore is successful */
                if (pgs->hpk_errval == 0 && !IS_ERR(obj)) {
@@ -1472,21 +1530,7 @@ static int hsm_cdt_request_completed(struct mdt_thread_info *mti,
                             &car->car_hai->hai_fid);
                need_changelog = false;
 
-               /* give back layout lock */
-               mutex_lock(&cdt->cdt_restore_lock);
-               crh = mdt_hsm_restore_hdl_find(cdt, &car->car_hai->hai_fid);
-               if (crh != NULL)
-                       list_del(&crh->crh_list);
-               mutex_unlock(&cdt->cdt_restore_lock);
-               /* Just give back layout lock, we keep the reference
-                * which is given back later with the lock for HSM
-                * flags.
-                * XXX obj may be invalid so we do not pass it. */
-               if (crh != NULL)
-                       mdt_object_unlock(mti, NULL, &crh->crh_lh, 1);
-
-               if (crh != NULL)
-                       OBD_SLAB_FREE_PTR(crh, mdt_hsm_cdt_kmem);
+               cdt_restore_handle_del(mti, cdt, &car->car_hai->hai_fid);
        }
 
        GOTO(out, rc);
@@ -1507,13 +1551,11 @@ out:
  * update status of a request
  * \param mti [IN] context
  * \param pgs [IN] progress of the copy tool
- * \param update_record [IN] update llog record
  * \retval 0 success
  * \retval -ve failure
  */
 int mdt_hsm_update_request_state(struct mdt_thread_info *mti,
-                                struct hsm_progress_kernel *pgs,
-                                const int update_record)
+                                struct hsm_progress_kernel *pgs)
 {
        struct mdt_device       *mdt = mti->mti_mdt;
        struct coordinator      *cdt = &mdt->mdt_coordinator;
@@ -1582,36 +1624,32 @@ int mdt_hsm_update_request_state(struct mdt_thread_info *mti,
        hsm_init_ucred(mdt_ucred(mti));
 
        if (pgs->hpk_flags & HP_FLAG_COMPLETED) {
-               enum agent_req_status    status;
+               enum agent_req_status status;
+               struct hsm_record_update update;
+               int rc1;
 
                rc = hsm_cdt_request_completed(mti, pgs, car, &status);
 
-               CDEBUG(D_HSM, "%s record: fid="DFID" cookie=%#llx action=%s "
+               CDEBUG(D_HSM, "updating record: fid="DFID" cookie=%#llx action=%s "
                              "status=%s\n",
-                      update_record ? "Updating" : "Not updating",
                       PFID(&pgs->hpk_fid), pgs->hpk_cookie,
                       hsm_copytool_action2name(car->car_hai->hai_action),
                       agent_req_status2name(status));
 
                /* update record first (LU-9075) */
-               if (update_record) {
-                       int rc1;
-                       struct hsm_record_update update = {
-                               .cookie = pgs->hpk_cookie,
-                               .status = status,
-                       };
-
-                       rc1 = mdt_agent_record_update(mti->mti_env, mdt,
-                                                     &update, 1);
-                       if (rc1)
-                               CERROR("%s: mdt_agent_record_update() failed,"
-                                      " rc=%d, cannot update status to %s"
-                                      " for cookie %#llx\n",
-                                      mdt_obd_name(mdt), rc1,
-                                      agent_req_status2name(status),
-                                      pgs->hpk_cookie);
-                       rc = (rc != 0 ? rc : rc1);
-               }
+               update.cookie = pgs->hpk_cookie;
+               update.status = status;
+
+               rc1 = mdt_agent_record_update(mti->mti_env, mdt,
+                                             &update, 1);
+               if (rc1)
+                       CERROR("%s: mdt_agent_record_update() failed,"
+                              " rc=%d, cannot update status to %s"
+                              " for cookie %#llx\n",
+                              mdt_obd_name(mdt), rc1,
+                              agent_req_status2name(status),
+                              pgs->hpk_cookie);
+               rc = (rc != 0 ? rc : rc1);
 
                /* then remove request from memory list (LU-9075) */
                mdt_cdt_remove_request(cdt, pgs->hpk_cookie);
@@ -1713,10 +1751,13 @@ static int hsm_cancel_all_actions(struct mdt_device *mdt)
 
        hsm_init_ucred(mdt_ucred(mti));
 
+       mutex_lock(&cdt->cdt_state_lock);
+       old_state = cdt->cdt_state;
+
        /* disable coordinator */
-       rc = set_cdt_state(cdt, CDT_DISABLE, &old_state);
+       rc = set_cdt_state_locked(cdt, CDT_DISABLE);
        if (rc)
-               RETURN(rc);
+               GOTO(out_cdt_state_unlock, rc);
 
        /* send cancel to all running requests */
        down_read(&cdt->cdt_request_lock);
@@ -1790,7 +1831,10 @@ static int hsm_cancel_all_actions(struct mdt_device *mdt)
                              &hcad, 0, 0, WRITE);
 out_cdt_state:
        /* Enable coordinator, unless the coordinator was stopping. */
-       set_cdt_state(cdt, old_state, NULL);
+       set_cdt_state_locked(cdt, old_state);
+out_cdt_state_unlock:
+       mutex_unlock(&cdt->cdt_state_lock);
+
        lu_context_exit(&session);
        lu_context_fini(&session);
 out_env:
@@ -2033,14 +2077,15 @@ mdt_hsm_##VAR##_seq_write(struct file *file, const char __user *buffer, \
        struct seq_file         *m = file->private_data;                \
        struct mdt_device       *mdt = m->private;                      \
        struct coordinator      *cdt = &mdt->mdt_coordinator;           \
-       __s64                    val;                                   \
-       int                      rc;                                    \
-       ENTRY;                                                          \
+       unsigned int val;                                               \
+       int rc;                                                         \
                                                                        \
-       rc = lprocfs_str_to_s64(buffer, count, &val);                   \
+       ENTRY;                                                          \
+       rc = kstrtouint_from_user(buffer, count, 0, &val);              \
        if (rc)                                                         \
                RETURN(rc);                                             \
-       if (val > 0 && val < INT_MAX) {                                 \
+                                                                       \
+       if (val !=  0) {                                                \
                cdt->VAR = val;                                         \
                RETURN(count);                                          \
        }                                                               \
@@ -2089,7 +2134,7 @@ mdt_hsm_cdt_control_seq_write(struct file *file, const char __user *buffer,
        rc = 0;
        if (strcmp(kernbuf, CDT_ENABLE_CMD) == 0) {
                if (cdt->cdt_state == CDT_DISABLE) {
-                       rc = set_cdt_state(cdt, CDT_RUNNING, NULL);
+                       rc = set_cdt_state(cdt, CDT_RUNNING);
                        mdt_hsm_cdt_event(cdt);
                        wake_up(&cdt->cdt_waitq);
                } else {
@@ -2111,7 +2156,7 @@ mdt_hsm_cdt_control_seq_write(struct file *file, const char __user *buffer,
                               mdt_obd_name(mdt));
                        rc = -EINVAL;
                } else {
-                       rc = set_cdt_state(cdt, CDT_DISABLE, NULL);
+                       rc = set_cdt_state(cdt, CDT_DISABLE);
                }
        } else if (strcmp(kernbuf, CDT_PURGE_CMD) == 0) {
                rc = hsm_cancel_all_actions(mdt);
@@ -2310,11 +2355,11 @@ mdt_hsm_cdt_raolu_seq_write(struct file *file, const char __user *buffer,
        struct seq_file *m = file->private_data;
        struct mdt_device *mdt = m->private;
        struct coordinator *cdt = &mdt->mdt_coordinator;
-       __s64 val;
+       bool val;
        int rc;
-       ENTRY;
 
-       rc = lprocfs_str_to_s64(buffer, count, &val);
+       ENTRY;
+       rc = kstrtobool_from_user(buffer, count, &val);
        if (rc < 0)
                RETURN(rc);