Whamcloud - gitweb
LU-15132 hsm: Protect against parallel HSM restore requests
[fs/lustre-release.git] / lustre / mdt / mdt_coordinator.c
index 9f371bc..b321d94 100644 (file)
@@ -23,7 +23,7 @@
  * Copyright (c) 2011, 2012 Commissariat a l'energie atomique et aux energies
  *                          alternatives
  *
- * Copyright (c) 2013, 2014, Intel Corporation.
+ * Copyright (c) 2013, 2017, Intel Corporation.
  * Use is subject to license terms.
  */
 /*
@@ -40,7 +40,6 @@
 
 #include <linux/kthread.h>
 #include <obd_support.h>
-#include <lustre_net.h>
 #include <lustre_export.h>
 #include <obd.h>
 #include <lprocfs_status.h>
@@ -48,8 +47,6 @@
 #include <lustre_kernelcomm.h>
 #include "mdt_internal.h"
 
-static struct lprocfs_vars lprocfs_mdt_hsm_vars[];
-
 /**
  * get obj and HSM attributes on a fid
  * \param mti [IN] context
@@ -102,21 +99,21 @@ void mdt_hsm_dump_hal(int level, const char *prefix,
        struct hsm_action_item  *hai;
        char                     buf[12];
 
-       CDEBUG(level, "%s: HAL header: version %X count %d compound "LPX64
-                     " archive_id %d flags "LPX64"\n",
+       CDEBUG(level, "%s: HAL header: version %X count %d"
+                     " archive_id %d flags %#llx\n",
               prefix, hal->hal_version, hal->hal_count,
-              hal->hal_compound_id, hal->hal_archive_id, hal->hal_flags);
+              hal->hal_archive_id, hal->hal_flags);
 
        hai = hai_first(hal);
        for (i = 0; i < hal->hal_count; i++) {
                sz = hai->hai_len - sizeof(*hai);
                CDEBUG(level, "%s %d: fid="DFID" dfid="DFID
-                      " compound/cookie="LPX64"/"LPX64
-                      " action=%s extent="LPX64"-"LPX64" gid="LPX64
+                      " cookie=%#llx"
+                      " action=%s extent=%#llx-%#llx gid=%#llx"
                       " datalen=%d data=[%s]\n",
                       prefix, i,
                       PFID(&hai->hai_fid), PFID(&hai->hai_dfid),
-                      hal->hal_compound_id, hai->hai_cookie,
+                      hai->hai_cookie,
                       hsm_copytool_action2name(hai->hai_action),
                       hai->hai_extent.offset,
                       hai->hai_extent.length,
@@ -130,24 +127,287 @@ void mdt_hsm_dump_hal(int level, const char *prefix,
  * data passed to llog_cat_process() callback
  * to scan requests and take actions
  */
+struct hsm_scan_request {
+       int                      hal_sz;
+       int                      hal_used_sz;
+       struct hsm_action_list  *hal;
+};
+
 struct hsm_scan_data {
-       struct mdt_thread_info          *mti;
-       char                             fs_name[MTI_NAME_MAXLEN+1];
-       /* request to be send to agents */
-       int                              request_sz;    /** allocated size */
-       int                              max_requests;  /** vector size */
-       int                              request_cnt;   /** used count */
-       struct {
-               int                      hal_sz;
-               int                      hal_used_sz;
-               struct hsm_action_list  *hal;
-       } *request;
-       /* records to be canceled */
-       int                              max_cookie;    /** vector size */
-       int                              cookie_cnt;    /** used count */
-       __u64                           *cookies;
+       struct mdt_thread_info  *hsd_mti;
+       char                     hsd_fsname[MTI_NAME_MAXLEN + 1];
+       /* are we scanning the logs for housekeeping, or just looking
+        * for new work?
+        */
+       bool                     hsd_housekeeping;
+       bool                     hsd_one_restore;
+       u32                      hsd_start_cat_idx;
+       u32                      hsd_start_rec_idx;
+       int                      hsd_action_count;
+       int                      hsd_request_len; /* array alloc len */
+       int                      hsd_request_count; /* array used count */
+       struct hsm_scan_request *hsd_request;
 };
 
+static int mdt_cdt_waiting_cb(const struct lu_env *env,
+                             struct mdt_device *mdt,
+                             struct llog_handle *llh,
+                             struct llog_agent_req_rec *larr,
+                             struct hsm_scan_data *hsd)
+{
+       struct coordinator *cdt = &mdt->mdt_coordinator;
+       struct hsm_scan_request *request;
+       struct hsm_action_item *hai;
+       size_t hai_size;
+       u32 archive_id;
+       bool wrapped;
+       int i;
+
+       /* Are agents full? */
+       if (atomic_read(&cdt->cdt_request_count) >= cdt->cdt_max_requests)
+               RETURN(hsd->hsd_housekeeping ? 0 : LLOG_PROC_BREAK);
+
+       if (hsd->hsd_action_count + atomic_read(&cdt->cdt_request_count) >=
+           cdt->cdt_max_requests) {
+               /* We cannot send any more request
+                *
+                *                     *** SPECIAL CASE ***
+                *
+                * Restore requests are too important not to schedule at least
+                * one, everytime we can.
+                */
+               if (larr->arr_hai.hai_action != HSMA_RESTORE ||
+                   hsd->hsd_one_restore)
+                       RETURN(hsd->hsd_housekeeping ? 0 : LLOG_PROC_BREAK);
+       }
+
+       hai_size = cfs_size_round(larr->arr_hai.hai_len);
+       archive_id = larr->arr_archive_id;
+
+       /* Can we add this action to one of the existing HALs in hsd. */
+       request = NULL;
+       for (i = 0; i < hsd->hsd_request_count; i++) {
+               if (hsd->hsd_request[i].hal->hal_archive_id == archive_id &&
+                   hsd->hsd_request[i].hal_used_sz + hai_size <=
+                   LDLM_MAXREQSIZE) {
+                       request = &hsd->hsd_request[i];
+                       break;
+               }
+       }
+
+       /* Are we trying to force-schedule a request? */
+       if (hsd->hsd_action_count + atomic_read(&cdt->cdt_request_count) >=
+           cdt->cdt_max_requests) {
+               /* Is there really no compatible hsm_scan_request? */
+               if (!request) {
+                       for (i -= 1; i >= 0; i--) {
+                               if (hsd->hsd_request[i].hal->hal_archive_id ==
+                                   archive_id) {
+                                       request = &hsd->hsd_request[i];
+                                       break;
+                               }
+                       }
+               }
+
+               /* Make room for the hai */
+               if (request) {
+                       /* Discard the last hai until there is enough space */
+                       do {
+                               request->hal->hal_count--;
+
+                               hai = hai_first(request->hal);
+                               for (i = 0; i < request->hal->hal_count; i++)
+                                       hai = hai_next(hai);
+                               request->hal_used_sz -=
+                                       cfs_size_round(hai->hai_len);
+                               hsd->hsd_action_count--;
+                       } while (request->hal_used_sz + hai_size >
+                                LDLM_MAXREQSIZE);
+               } else if (hsd->hsd_housekeeping) {
+                       struct hsm_scan_request *tmp;
+
+                       /* Discard the (whole) last hal */
+                       hsd->hsd_request_count--;
+                       LASSERT(hsd->hsd_request_count >= 0);
+                       tmp = &hsd->hsd_request[hsd->hsd_request_count];
+                       hsd->hsd_action_count -= tmp->hal->hal_count;
+                       LASSERT(hsd->hsd_action_count >= 0);
+                       OBD_FREE(tmp->hal, tmp->hal_sz);
+               } else {
+                       /* Bailing out, this code path is too hot */
+                       RETURN(LLOG_PROC_BREAK);
+
+               }
+       }
+
+       if (!request) {
+               struct hsm_action_list *hal;
+
+               LASSERT(hsd->hsd_request_count < hsd->hsd_request_len);
+               request = &hsd->hsd_request[hsd->hsd_request_count];
+
+               /* allocates hai vector size just needs to be large
+                * enough */
+               request->hal_sz = sizeof(*request->hal) +
+                       cfs_size_round(MTI_NAME_MAXLEN + 1) + 2 * hai_size;
+               OBD_ALLOC_LARGE(hal, request->hal_sz);
+               if (!hal)
+                       RETURN(-ENOMEM);
+
+               hal->hal_version = HAL_VERSION;
+               strlcpy(hal->hal_fsname, hsd->hsd_fsname, MTI_NAME_MAXLEN + 1);
+               hal->hal_archive_id = larr->arr_archive_id;
+               hal->hal_flags = larr->arr_flags;
+               hal->hal_count = 0;
+               request->hal_used_sz = hal_size(hal);
+               request->hal = hal;
+               hsd->hsd_request_count++;
+       } else if (request->hal_sz < request->hal_used_sz + hai_size) {
+               /* Not enough room, need an extension */
+               void *hal_buffer;
+               int sz;
+
+               sz = min_t(int, 2 * request->hal_sz, LDLM_MAXREQSIZE);
+               LASSERT(request->hal_used_sz + hai_size < sz);
+
+               OBD_ALLOC_LARGE(hal_buffer, sz);
+               if (!hal_buffer)
+                       RETURN(-ENOMEM);
+
+               memcpy(hal_buffer, request->hal, request->hal_used_sz);
+               OBD_FREE_LARGE(request->hal, request->hal_sz);
+               request->hal = hal_buffer;
+               request->hal_sz = sz;
+       }
+
+       hai = hai_first(request->hal);
+       for (i = 0; i < request->hal->hal_count; i++)
+               hai = hai_next(hai);
+
+       memcpy(hai, &larr->arr_hai, larr->arr_hai.hai_len);
+
+       request->hal_used_sz += hai_size;
+       request->hal->hal_count++;
+
+       hsd->hsd_action_count++;
+
+       switch (hai->hai_action) {
+       case HSMA_CANCEL:
+               break;
+       case HSMA_RESTORE:
+               hsd->hsd_one_restore = true;
+               fallthrough;
+       default:
+               cdt_agent_record_hash_add(cdt, hai->hai_cookie,
+                                         llh->lgh_hdr->llh_cat_idx,
+                                         larr->arr_hdr.lrh_index);
+       }
+
+       wrapped = llh->lgh_hdr->llh_cat_idx >= llh->lgh_last_idx &&
+                 llh->lgh_hdr->llh_count > 1;
+       if ((!wrapped && llh->lgh_hdr->llh_cat_idx > hsd->hsd_start_cat_idx) ||
+           (wrapped && llh->lgh_hdr->llh_cat_idx < hsd->hsd_start_cat_idx) ||
+           (llh->lgh_hdr->llh_cat_idx == hsd->hsd_start_cat_idx &&
+            larr->arr_hdr.lrh_index > hsd->hsd_start_rec_idx)) {
+               hsd->hsd_start_cat_idx = llh->lgh_hdr->llh_cat_idx;
+               hsd->hsd_start_rec_idx = larr->arr_hdr.lrh_index;
+       }
+
+       RETURN(0);
+}
+
+static int mdt_cdt_started_cb(const struct lu_env *env,
+                             struct mdt_device *mdt,
+                             struct llog_handle *llh,
+                             struct llog_agent_req_rec *larr,
+                             struct hsm_scan_data *hsd)
+{
+       struct coordinator *cdt = &mdt->mdt_coordinator;
+       struct hsm_action_item *hai = &larr->arr_hai;
+       struct cdt_agent_req *car;
+       time64_t now = ktime_get_real_seconds();
+       time64_t last;
+       enum changelog_rec_flags clf_flags;
+       int rc;
+
+       if (!hsd->hsd_housekeeping)
+               RETURN(0);
+
+       /* we search for a running request
+        * error may happen if coordinator crashes or stopped
+        * with running request
+        */
+       car = mdt_cdt_find_request(cdt, hai->hai_cookie);
+       if (car == NULL) {
+               last = larr->arr_req_change;
+       } else {
+               last = car->car_req_update;
+       }
+
+       /* test if request too long, if yes cancel it
+        * the same way the copy tool acknowledge a cancel request */
+       if (now <= last + cdt->cdt_active_req_timeout)
+               GOTO(out_car, rc = 0);
+
+       dump_llog_agent_req_rec("request timed out, start cleaning", larr);
+
+       if (car != NULL) {
+               car->car_req_update = now;
+               mdt_hsm_agent_update_statistics(cdt, 0, 1, 0, &car->car_uuid);
+               /* Remove car from memory list (LU-9075) */
+               mdt_cdt_remove_request(cdt, hai->hai_cookie);
+       }
+
+       /* Emit a changelog record for the failed action.*/
+       clf_flags = 0;
+       hsm_set_cl_error(&clf_flags, ECANCELED);
+
+       switch (hai->hai_action) {
+       case HSMA_ARCHIVE:
+               hsm_set_cl_event(&clf_flags, HE_ARCHIVE);
+               break;
+       case HSMA_RESTORE:
+               hsm_set_cl_event(&clf_flags, HE_RESTORE);
+               break;
+       case HSMA_REMOVE:
+               hsm_set_cl_event(&clf_flags, HE_REMOVE);
+               break;
+       case HSMA_CANCEL:
+               hsm_set_cl_event(&clf_flags, HE_CANCEL);
+               break;
+       default:
+               /* Unknown record type, skip changelog. */
+               clf_flags = 0;
+               break;
+       }
+
+       if (clf_flags != 0)
+               mo_changelog(env, CL_HSM, clf_flags, mdt->mdt_child,
+                            &hai->hai_fid);
+
+       if (hai->hai_action == HSMA_RESTORE)
+               cdt_restore_handle_del(hsd->hsd_mti, cdt, &hai->hai_fid);
+
+       larr->arr_status = ARS_CANCELED;
+       larr->arr_req_change = now;
+       rc = llog_write(hsd->hsd_mti->mti_env, llh, &larr->arr_hdr,
+                       larr->arr_hdr.lrh_index);
+       if (rc < 0) {
+               CERROR("%s: cannot update agent log: rc = %d\n",
+                      mdt_obd_name(mdt), rc);
+               rc = LLOG_DEL_RECORD;
+       }
+
+       /* ct has completed a request, so a slot is available,
+        * signal the coordinator to find new work */
+       mdt_hsm_cdt_event(cdt);
+out_car:
+       if (car != NULL)
+               mdt_cdt_put_request(car);
+
+       RETURN(rc);
+}
+
 /**
  *  llog_cat_process() callback, used to:
  *  - find waiting request and start action
@@ -164,280 +424,158 @@ static int mdt_coordinator_cb(const struct lu_env *env,
                              struct llog_rec_hdr *hdr,
                              void *data)
 {
-       const struct llog_agent_req_rec *larr;
-       struct hsm_scan_data            *hsd;
-       struct hsm_action_item          *hai;
-       struct mdt_device               *mdt;
-       struct coordinator              *cdt;
-       int                              rc;
+       struct llog_agent_req_rec *larr = (struct llog_agent_req_rec *)hdr;
+       struct hsm_scan_data *hsd = data;
+       struct mdt_device *mdt = hsd->hsd_mti->mti_mdt;
+       struct coordinator *cdt = &mdt->mdt_coordinator;
        ENTRY;
 
-       hsd = data;
-       mdt = hsd->mti->mti_mdt;
-       cdt = &mdt->mdt_coordinator;
-
        larr = (struct llog_agent_req_rec *)hdr;
        dump_llog_agent_req_rec("mdt_coordinator_cb(): ", larr);
        switch (larr->arr_status) {
-       case ARS_WAITING: {
-               int i, empty_slot, found;
-
-               /* Are agents full? */
-               if (atomic_read(&cdt->cdt_request_count) ==
-                   cdt->cdt_max_requests)
-                       break;
-
-               /* first search whether the request is found in the list we
-                * have built and if there is room in the request vector */
-               empty_slot = -1;
-               found = -1;
-               for (i = 0; i < hsd->max_requests &&
-                           (empty_slot == -1 || found == -1); i++) {
-                       if (hsd->request[i].hal == NULL) {
-                               empty_slot = i;
-                               continue;
-                       }
-                       if (hsd->request[i].hal->hal_compound_id ==
-                               larr->arr_compound_id) {
-                               found = i;
-                               continue;
-                       }
-               }
-               if (found == -1 && empty_slot == -1)
-                       /* unknown request and no more room for new request,
-                        * continue scan for to find other entries for
-                        * already found request
-                        */
+       case ARS_WAITING:
+               RETURN(mdt_cdt_waiting_cb(env, mdt, llh, larr, hsd));
+       case ARS_STARTED:
+               RETURN(mdt_cdt_started_cb(env, mdt, llh, larr, hsd));
+       default:
+               if (!hsd->hsd_housekeeping)
                        RETURN(0);
 
-               if (found == -1) {
-                       struct hsm_action_list *hal;
-
-                       /* request is not already known */
-                       /* allocates hai vector size just needs to be large
-                        * enough */
-                       hsd->request[empty_slot].hal_sz =
-                                    sizeof(*hsd->request[empty_slot].hal) +
-                                    cfs_size_round(MTI_NAME_MAXLEN+1) +
-                                    2 * cfs_size_round(larr->arr_hai.hai_len);
-                       OBD_ALLOC(hal, hsd->request[empty_slot].hal_sz);
-                       if (!hal) {
-                               CERROR("%s: Cannot allocate memory (%d o)"
-                                      "for compound "LPX64"\n",
-                                      mdt_obd_name(mdt),
-                                      hsd->request[i].hal_sz,
-                                      larr->arr_compound_id);
-                               RETURN(-ENOMEM);
-                       }
-                       hal->hal_version = HAL_VERSION;
-                       strlcpy(hal->hal_fsname, hsd->fs_name,
-                               MTI_NAME_MAXLEN + 1);
-                       hal->hal_compound_id = larr->arr_compound_id;
-                       hal->hal_archive_id = larr->arr_archive_id;
-                       hal->hal_flags = larr->arr_flags;
-                       hal->hal_count = 0;
-                       hsd->request[empty_slot].hal_used_sz = hal_size(hal);
-                       hsd->request[empty_slot].hal = hal;
-                       hsd->request_cnt++;
-                       found = empty_slot;
-                       hai = hai_first(hal);
-               } else {
-                       /* request is known */
-                       /* we check if record archive num is the same as the
-                        * known request, if not we will serve it in multiple
-                        * time because we do not know if the agent can serve
-                        * multiple backend
-                        * a use case is a compound made of multiple restore
-                        * where the files are not archived in the same backend
-                        */
-                       if (larr->arr_archive_id !=
-                           hsd->request[found].hal->hal_archive_id)
-                               RETURN(0);
-
-                       if (hsd->request[found].hal_sz <
-                           hsd->request[found].hal_used_sz +
-                            cfs_size_round(larr->arr_hai.hai_len)) {
-                               /* Not enough room, need an extension */
-                               void *hal_buffer;
-                               int sz;
-
-                               sz = 2 * hsd->request[found].hal_sz;
-                               OBD_ALLOC(hal_buffer, sz);
-                               if (!hal_buffer) {
-                                       CERROR("%s: Cannot allocate memory "
-                                              "(%d o) for compound "LPX64"\n",
-                                              mdt_obd_name(mdt), sz,
-                                              larr->arr_compound_id);
-                                       RETURN(-ENOMEM);
-                               }
-                               memcpy(hal_buffer, hsd->request[found].hal,
-                                      hsd->request[found].hal_used_sz);
-                               OBD_FREE(hsd->request[found].hal,
-                                        hsd->request[found].hal_sz);
-                               hsd->request[found].hal = hal_buffer;
-                               hsd->request[found].hal_sz = sz;
-                       }
-                       hai = hai_first(hsd->request[found].hal);
-                       for (i = 0; i < hsd->request[found].hal->hal_count;
-                            i++)
-                               hai = hai_next(hai);
+               if ((larr->arr_req_change + cdt->cdt_grace_delay) <
+                   ktime_get_real_seconds()) {
+                       cdt_agent_record_hash_del(cdt,
+                                                 larr->arr_hai.hai_cookie);
+                       RETURN(LLOG_DEL_RECORD);
                }
-               memcpy(hai, &larr->arr_hai, larr->arr_hai.hai_len);
-               hai->hai_cookie = larr->arr_hai.hai_cookie;
-               hai->hai_gid = larr->arr_hai.hai_gid;
 
-               hsd->request[found].hal_used_sz +=
-                                                  cfs_size_round(hai->hai_len);
-               hsd->request[found].hal->hal_count++;
-               break;
+               RETURN(0);
        }
-       case ARS_STARTED: {
-               struct cdt_agent_req *car;
-               cfs_time_t last;
+}
 
-               /* we search for a running request
-                * error may happen if coordinator crashes or stopped
-                * with running request
-                */
-               car = mdt_cdt_find_request(cdt, larr->arr_hai.hai_cookie, NULL);
-               if (car == NULL) {
-                       last = larr->arr_req_create;
-               } else {
-                       last = car->car_req_update;
-                       mdt_cdt_put_request(car);
-               }
+/* Release the ressource used by the coordinator. Called when the
+ * coordinator is stopping. */
+static void mdt_hsm_cdt_cleanup(struct mdt_device *mdt)
+{
+       struct coordinator              *cdt = &mdt->mdt_coordinator;
+       struct cdt_agent_req            *car, *tmp1;
+       struct hsm_agent                *ha, *tmp2;
+       struct cdt_restore_handle       *crh, *tmp3;
+       struct mdt_thread_info          *cdt_mti;
 
-               /* test if request too long, if yes cancel it
-                * the same way the copy tool acknowledge a cancel request */
-               if ((last + cdt->cdt_active_req_timeout)
-                    < cfs_time_current_sec()) {
-                       struct hsm_progress_kernel pgs;
-
-                       dump_llog_agent_req_rec("mdt_coordinator_cb(): "
-                                               "request timed out, start "
-                                               "cleaning", larr);
-                       /* a too old cancel request just needs to be removed
-                        * this can happen, if copy tool does not support cancel
-                        * for other requests, we have to remove the running
-                        * request and notify the copytool
-                        */
-                       pgs.hpk_fid = larr->arr_hai.hai_fid;
-                       pgs.hpk_cookie = larr->arr_hai.hai_cookie;
-                       pgs.hpk_extent = larr->arr_hai.hai_extent;
-                       pgs.hpk_flags = HP_FLAG_COMPLETED;
-                       pgs.hpk_errval = ENOSYS;
-                       pgs.hpk_data_version = 0;
-                       /* update request state, but do not record in llog, to
-                        * avoid deadlock on cdt_llog_lock
-                        */
-                       rc = mdt_hsm_update_request_state(hsd->mti, &pgs, 0);
-                       if (rc)
-                               CERROR("%s: Cannot cleanup timed out request: "
-                                      DFID" for cookie "LPX64" action=%s\n",
-                                      mdt_obd_name(mdt),
-                                      PFID(&pgs.hpk_fid), pgs.hpk_cookie,
-                                      hsm_copytool_action2name(
-                                                    larr->arr_hai.hai_action));
-
-                       if (rc == -ENOENT) {
-                               /* The request no longer exists, forget
-                                * about it, and do not send a cancel request
-                                * to the client, for which an error will be
-                                * sent back, leading to an endless cycle of
-                                * cancellation. */
-                               RETURN(LLOG_DEL_RECORD);
-                       }
+       /* start cleaning */
+       down_write(&cdt->cdt_request_lock);
+       list_for_each_entry_safe(car, tmp1, &cdt->cdt_request_list,
+                                car_request_list) {
+               cfs_hash_del(cdt->cdt_request_cookie_hash,
+                            &car->car_hai->hai_cookie,
+                            &car->car_cookie_hash);
+               list_del(&car->car_request_list);
+               mdt_cdt_put_request(car);
+       }
+       up_write(&cdt->cdt_request_lock);
 
-                       /* add the cookie to the list of record to be
-                        * canceled by caller */
-                       if (hsd->max_cookie == (hsd->cookie_cnt - 1)) {
-                               __u64 *ptr, *old_ptr;
-                               int old_sz, new_sz, new_cnt;
-
-                               /* need to increase vector size */
-                               old_sz = sizeof(__u64) * hsd->max_cookie;
-                               old_ptr = hsd->cookies;
-
-                               new_cnt = 2 * hsd->max_cookie;
-                               new_sz = sizeof(__u64) * new_cnt;
-
-                               OBD_ALLOC(ptr, new_sz);
-                               if (!ptr) {
-                                       CERROR("%s: Cannot allocate memory "
-                                              "(%d o) for cookie vector\n",
-                                              mdt_obd_name(mdt), new_sz);
-                                       RETURN(-ENOMEM);
-                               }
-                               memcpy(ptr, hsd->cookies, old_sz);
-                               hsd->cookies = ptr;
-                               hsd->max_cookie = new_cnt;
-                               OBD_FREE(old_ptr, old_sz);
-                       }
-                       hsd->cookies[hsd->cookie_cnt] =
-                                                      larr->arr_hai.hai_cookie;
-                       hsd->cookie_cnt++;
-               }
-               break;
+       down_write(&cdt->cdt_agent_lock);
+       list_for_each_entry_safe(ha, tmp2, &cdt->cdt_agents, ha_list) {
+               list_del(&ha->ha_list);
+               if (ha->ha_archive_cnt != 0)
+                       OBD_FREE_PTR_ARRAY(ha->ha_archive_id,
+                                          ha->ha_archive_cnt);
+               OBD_FREE_PTR(ha);
        }
-       case ARS_FAILED:
-       case ARS_CANCELED:
-       case ARS_SUCCEED:
-               if ((larr->arr_req_change + cdt->cdt_grace_delay) <
-                   cfs_time_current_sec())
-                       RETURN(LLOG_DEL_RECORD);
-               break;
+       up_write(&cdt->cdt_agent_lock);
+
+       cdt_mti = lu_context_key_get(&cdt->cdt_env.le_ctx, &mdt_thread_key);
+       mutex_lock(&cdt->cdt_restore_lock);
+       list_for_each_entry_safe(crh, tmp3, &cdt->cdt_restore_handle_list,
+                                crh_list) {
+               /* not locked yet, cleanup by cdt_restore_handle_add() */
+               if (crh->crh_lh.mlh_type == MDT_NUL_LOCK)
+                       continue;
+               list_del(&crh->crh_list);
+               /* give back layout lock */
+               mdt_object_unlock(cdt_mti, NULL, &crh->crh_lh, 1);
+               OBD_SLAB_FREE_PTR(crh, mdt_hsm_cdt_kmem);
        }
-       RETURN(0);
+       mutex_unlock(&cdt->cdt_restore_lock);
 }
 
+/*
+ * Coordinator state transition table, indexed on enum cdt_states, taking
+ * from and to states. For instance since CDT_INIT to CDT_RUNNING is a
+ * valid transition, cdt_transition[CDT_INIT][CDT_RUNNING] is true.
+ */
+static bool cdt_transition[CDT_STATES_COUNT][CDT_STATES_COUNT] = {
+       /* from -> to:    stopped init   running disable stopping */
+       /* stopped */   { true,   true,  false,  false,  false },
+       /* init */      { true,   false, true,   false,  false },
+       /* running */   { false,  false, true,   true,   true },
+       /* disable */   { false,  false, true,   true,   true },
+       /* stopping */  { true,   false, false,  false,  false }
+};
+
 /**
- * create /proc entries for coordinator
- * \param mdt [IN]
- * \retval 0 success
- * \retval -ve failure
+ * Change coordinator thread state
+ * Some combinations are not valid, so catch them here.
+ *
+ * Returns 0 on success, with old_state set if not NULL, or -EINVAL if
+ * the transition was not possible.
  */
-int hsm_cdt_procfs_init(struct mdt_device *mdt)
+static int set_cdt_state_locked(struct coordinator *cdt,
+                               enum cdt_states new_state)
 {
-       struct coordinator      *cdt = &mdt->mdt_coordinator;
-       int                      rc = 0;
-       ENTRY;
+       int rc;
+       enum cdt_states state;
 
-       /* init /proc entries, failure is not critical */
-       cdt->cdt_proc_dir = lprocfs_register("hsm",
-                                            mdt2obd_dev(mdt)->obd_proc_entry,
-                                            lprocfs_mdt_hsm_vars, mdt);
-       if (IS_ERR(cdt->cdt_proc_dir)) {
-               rc = PTR_ERR(cdt->cdt_proc_dir);
-               CERROR("%s: Cannot create 'hsm' directory in mdt proc dir,"
-                      " rc=%d\n", mdt_obd_name(mdt), rc);
-               cdt->cdt_proc_dir = NULL;
-               RETURN(rc);
+       state = cdt->cdt_state;
+
+       if (cdt_transition[state][new_state]) {
+               cdt->cdt_state = new_state;
+               rc = 0;
+       } else {
+               CDEBUG(D_HSM,
+                      "unexpected coordinator transition, from=%s, to=%s\n",
+                      cdt_mdt_state2str(state), cdt_mdt_state2str(new_state));
+               rc = -EINVAL;
        }
 
-       RETURN(0);
+       return rc;
 }
 
-/**
- * remove /proc entries for coordinator
- * \param mdt [IN]
- */
-void  hsm_cdt_procfs_fini(struct mdt_device *mdt)
+static int set_cdt_state(struct coordinator *cdt, enum cdt_states new_state)
 {
-       struct coordinator      *cdt = &mdt->mdt_coordinator;
+       int rc;
+
+       mutex_lock(&cdt->cdt_state_lock);
+       rc = set_cdt_state_locked(cdt, new_state);
+       mutex_unlock(&cdt->cdt_state_lock);
 
-       LASSERT(cdt->cdt_state == CDT_STOPPED);
-       if (cdt->cdt_proc_dir != NULL)
-               lprocfs_remove(&cdt->cdt_proc_dir);
+       return rc;
 }
 
-/**
- * get vector of hsm cdt /proc vars
- * \param none
- * \retval var vector
- */
-struct lprocfs_vars *hsm_cdt_get_proc_vars(void)
+static int mdt_hsm_pending_restore(struct mdt_thread_info *mti);
+
+static void cdt_start_pending_restore(struct mdt_device *mdt,
+                                     struct coordinator *cdt)
 {
-       return lprocfs_mdt_hsm_vars;
+       struct mdt_thread_info *cdt_mti;
+       unsigned int i = 0;
+       int rc;
+
+       /* wait until MDD initialize hsm actions llog */
+       while (!test_bit(MDT_FL_CFGLOG, &mdt->mdt_state) && i < obd_timeout) {
+               schedule_timeout_interruptible(cfs_time_seconds(1));
+               i++;
+       }
+       if (!test_bit(MDT_FL_CFGLOG, &mdt->mdt_state))
+               CWARN("%s: trying to init HSM before MDD\n", mdt_obd_name(mdt));
+
+       /* set up list of started restore requests */
+       cdt_mti = lu_context_key_get(&cdt->cdt_env.le_ctx, &mdt_thread_key);
+       rc = mdt_hsm_pending_restore(cdt_mti);
+       if (rc)
+               CERROR("%s: cannot take the layout locks needed for registered restore: %d\n",
+                      mdt_obd_name(mdt), rc);
+
 }
 
 /**
@@ -452,246 +590,294 @@ static int mdt_coordinator(void *data)
        struct mdt_device       *mdt = mti->mti_mdt;
        struct coordinator      *cdt = &mdt->mdt_coordinator;
        struct hsm_scan_data     hsd = { NULL };
-       int                      rc = 0;
+       time64_t                 last_housekeeping = 0;
+       size_t request_sz = 0;
+       int rc;
        ENTRY;
 
-       cdt->cdt_thread.t_flags = SVC_RUNNING;
-       wake_up(&cdt->cdt_thread.t_ctl_waitq);
-
        CDEBUG(D_HSM, "%s: coordinator thread starting, pid=%d\n",
-              mdt_obd_name(mdt), current_pid());
-
-       /* timeouted cookie vector initialization */
-       hsd.max_cookie = 0;
-       hsd.cookie_cnt = 0;
-       hsd.cookies = NULL;
-       /* we use a copy of cdt_max_requests in the cb, so if cdt_max_requests
-        * increases due to a change from /proc we do not overflow the
-        * hsd.request[] vector
-        */
-       hsd.max_requests = cdt->cdt_max_requests;
-       hsd.request_sz = hsd.max_requests * sizeof(*hsd.request);
-       OBD_ALLOC(hsd.request, hsd.request_sz);
-       if (!hsd.request)
-               GOTO(out, rc = -ENOMEM);
+              mdt_obd_name(mdt), current->pid);
+
+       hsd.hsd_mti = mti;
+       obd_uuid2fsname(hsd.hsd_fsname, mdt_obd_name(mdt),
+                       sizeof(hsd.hsd_fsname));
+
+       set_cdt_state(cdt, CDT_RUNNING);
 
-       hsd.mti = mti;
-       obd_uuid2fsname(hsd.fs_name, mdt_obd_name(mdt), MTI_NAME_MAXLEN);
+       /* Inform mdt_hsm_cdt_start(). */
+       wake_up(&cdt->cdt_waitq);
+       cdt_start_pending_restore(mdt, cdt);
 
        while (1) {
-               struct l_wait_info lwi;
                int i;
+               int update_idx = 0;
+               int updates_sz;
+               int updates_cnt;
+               u32 start_cat_idx;
+               u32 start_rec_idx;
+               struct hsm_record_update *updates;
+
+               /* Limit execution of the expensive requests traversal
+                * to at most one second. This prevents repeatedly
+                * locking/unlocking the catalog for each request
+                * and preventing other HSM operations from happening
+                */
+               wait_event_interruptible_timeout(cdt->cdt_waitq,
+                                                kthread_should_stop() ||
+                                                cdt->cdt_wakeup_coordinator,
+                                                cfs_time_seconds(1));
 
-               lwi = LWI_TIMEOUT(cfs_time_seconds(cdt->cdt_loop_period),
-                                 NULL, NULL);
-               l_wait_event(cdt->cdt_thread.t_ctl_waitq,
-                            (cdt->cdt_thread.t_flags &
-                             (SVC_STOPPING|SVC_EVENT)),
-                            &lwi);
-
+               cdt->cdt_wakeup_coordinator = false;
                CDEBUG(D_HSM, "coordinator resumes\n");
 
-               if (cdt->cdt_thread.t_flags & SVC_STOPPING ||
-                   cdt->cdt_state == CDT_STOPPING) {
-                       cdt->cdt_thread.t_flags &= ~SVC_STOPPING;
+               if (kthread_should_stop()) {
+                       CDEBUG(D_HSM, "Coordinator stops\n");
                        rc = 0;
                        break;
                }
 
-               /* wake up before timeout, new work arrives */
-               if (cdt->cdt_thread.t_flags & SVC_EVENT)
-                       cdt->cdt_thread.t_flags &= ~SVC_EVENT;
-
                /* if coordinator is suspended continue to wait */
                if (cdt->cdt_state == CDT_DISABLE) {
                        CDEBUG(D_HSM, "disable state, coordinator sleeps\n");
                        continue;
                }
 
+               /* If no event, and no housekeeping to do, continue to
+                * wait. */
+               if (last_housekeeping + cdt->cdt_loop_period <=
+                   ktime_get_real_seconds()) {
+                       last_housekeeping = ktime_get_real_seconds();
+                       hsd.hsd_housekeeping = true;
+                       start_cat_idx = 0;
+                       start_rec_idx = 0;
+               } else if (cdt->cdt_event) {
+                       hsd.hsd_housekeeping = false;
+                       start_cat_idx = hsd.hsd_start_cat_idx;
+                       start_rec_idx = hsd.hsd_start_rec_idx;
+               } else {
+                       continue;
+               }
+
+               cdt->cdt_event = false;
+
                CDEBUG(D_HSM, "coordinator starts reading llog\n");
 
-               if (hsd.max_requests != cdt->cdt_max_requests) {
+               if (hsd.hsd_request_len != cdt->cdt_max_requests) {
                        /* cdt_max_requests has changed,
                         * we need to allocate a new buffer
                         */
-                       OBD_FREE(hsd.request, hsd.request_sz);
-                       hsd.max_requests = cdt->cdt_max_requests;
-                       hsd.request_sz =
-                                  hsd.max_requests * sizeof(*hsd.request);
-                       OBD_ALLOC(hsd.request, hsd.request_sz);
-                       if (!hsd.request) {
-                               rc = -ENOMEM;
-                               break;
+                       struct hsm_scan_request *tmp = NULL;
+                       int max_requests = cdt->cdt_max_requests;
+                       OBD_ALLOC_LARGE(tmp, max_requests *
+                                       sizeof(struct hsm_scan_request));
+                       if (!tmp) {
+                               CERROR("Failed to resize request buffer, "
+                                      "keeping it at %d\n",
+                                      hsd.hsd_request_len);
+                       } else {
+                               if (hsd.hsd_request != NULL)
+                                       OBD_FREE_LARGE(hsd.hsd_request,
+                                                      request_sz);
+
+                               hsd.hsd_request_len = max_requests;
+                               request_sz = hsd.hsd_request_len *
+                                       sizeof(struct hsm_scan_request);
+                               hsd.hsd_request = tmp;
                        }
                }
 
-               /* create canceled cookie vector for an arbitrary size
-                * if needed, vector will grow during llog scan
-                */
-               hsd.max_cookie = 10;
-               hsd.cookie_cnt = 0;
-               OBD_ALLOC(hsd.cookies, hsd.max_cookie * sizeof(__u64));
-               if (!hsd.cookies) {
-                       rc = -ENOMEM;
-                       goto clean_cb_alloc;
-               }
-               hsd.request_cnt = 0;
+               hsd.hsd_action_count = 0;
+               hsd.hsd_request_count = 0;
+               hsd.hsd_one_restore = false;
 
-               rc = cdt_llog_process(mti->mti_env, mdt,
-                                     mdt_coordinator_cb, &hsd);
+               rc = cdt_llog_process(mti->mti_env, mdt, mdt_coordinator_cb,
+                                     &hsd, start_cat_idx, start_rec_idx,
+                                     WRITE);
                if (rc < 0)
                        goto clean_cb_alloc;
 
-               CDEBUG(D_HSM, "Found %d requests to send and %d"
-                             " requests to cancel\n",
-                      hsd.request_cnt, hsd.cookie_cnt);
-               /* first we cancel llog records of the timed out requests */
-               if (hsd.cookie_cnt > 0) {
-                       rc = mdt_agent_record_update(mti->mti_env, mdt,
-                                                    hsd.cookies,
-                                                    hsd.cookie_cnt,
-                                                    ARS_CANCELED);
-                       if (rc)
-                               CERROR("%s: mdt_agent_record_update() failed, "
-                                      "rc=%d, cannot update status to %s "
-                                      "for %d cookies\n",
-                                      mdt_obd_name(mdt), rc,
-                                      agent_req_status2name(ARS_CANCELED),
-                                      hsd.cookie_cnt);
-               }
+               CDEBUG(D_HSM, "found %d requests to send\n",
+                      hsd.hsd_request_count);
 
                if (list_empty(&cdt->cdt_agents)) {
                        CDEBUG(D_HSM, "no agent available, "
                                      "coordinator sleeps\n");
+                       /* reset HSM scanning index range. */
+                       hsd.hsd_start_cat_idx = start_cat_idx;
+                       hsd.hsd_start_rec_idx = start_rec_idx;
+                       goto clean_cb_alloc;
+               }
+
+               /* Compute how many HAI we have in all the requests */
+               updates_cnt = 0;
+               for (i = 0; i < hsd.hsd_request_count; i++) {
+                       const struct hsm_scan_request *request =
+                               &hsd.hsd_request[i];
+
+                       updates_cnt += request->hal->hal_count;
+               }
+
+               /* Allocate a temporary array to store the cookies to
+                * update, and their status. */
+               updates_sz = updates_cnt * sizeof(*updates);
+               OBD_ALLOC_LARGE(updates, updates_sz);
+               if (updates == NULL) {
+                       CERROR("%s: Cannot allocate memory (%d bytes) "
+                               "for %d updates. Too many HSM requests?\n",
+                              mdt_obd_name(mdt), updates_sz, updates_cnt);
                        goto clean_cb_alloc;
                }
 
                /* here hsd contains a list of requests to be started */
-               for (i = 0; i < hsd.max_requests; i++) {
-                       struct hsm_action_list  *hal;
+               for (i = 0; i < hsd.hsd_request_count; i++) {
+                       struct hsm_scan_request *request = &hsd.hsd_request[i];
+                       struct hsm_action_list  *hal = request->hal;
                        struct hsm_action_item  *hai;
-                       __u64                   *cookies;
-                       int                      sz, j;
-                       enum agent_req_status    status;
+                       int                      j;
 
                        /* still room for work ? */
-                       if (atomic_read(&cdt->cdt_request_count) ==
+                       if (atomic_read(&cdt->cdt_request_count) >=
                            cdt->cdt_max_requests)
                                break;
 
-                       if (hsd.request[i].hal == NULL)
-                               continue;
-
-                       /* found a request, we start it */
-                       /* kuc payload allocation so we avoid an additionnal
-                        * allocation in mdt_hsm_agent_send()
-                        */
-                       hal = kuc_alloc(hsd.request[i].hal_used_sz,
-                                       KUC_TRANSPORT_HSM, HMT_ACTION_LIST);
-                       if (IS_ERR(hal)) {
-                               CERROR("%s: Cannot allocate memory (%d o) "
-                                      "for compound "LPX64"\n",
-                                      mdt_obd_name(mdt),
-                                      hsd.request[i].hal_used_sz,
-                                      hsd.request[i].hal->hal_compound_id);
-                               continue;
-                       }
-                       memcpy(hal, hsd.request[i].hal,
-                              hsd.request[i].hal_used_sz);
-
                        rc = mdt_hsm_agent_send(mti, hal, 0);
                        /* if failure, we suppose it is temporary
                         * if the copy tool failed to do the request
                         * it has to use hsm_progress
                         */
-                       status = (rc ? ARS_WAITING : ARS_STARTED);
 
                        /* set up cookie vector to set records status
                         * after copy tools start or failed
                         */
-                       sz = hsd.request[i].hal->hal_count * sizeof(__u64);
-                       OBD_ALLOC(cookies, sz);
-                       if (cookies == NULL) {
-                               CERROR("%s: Cannot allocate memory (%d o) "
-                                      "for cookies vector "LPX64"\n",
-                                      mdt_obd_name(mdt), sz,
-                                      hsd.request[i].hal->hal_compound_id);
-                               kuc_free(hal, hsd.request[i].hal_used_sz);
-                               continue;
-                       }
                        hai = hai_first(hal);
-                       for (j = 0; j < hsd.request[i].hal->hal_count; j++) {
-                               cookies[j] = hai->hai_cookie;
+                       for (j = 0; j < hal->hal_count; j++) {
+                               updates[update_idx].cookie = hai->hai_cookie;
+                               updates[update_idx].status =
+                                       (rc ? ARS_WAITING : ARS_STARTED);
                                hai = hai_next(hai);
+                               update_idx++;
                        }
 
-                       rc = mdt_agent_record_update(mti->mti_env, mdt, cookies,
-                                               hsd.request[i].hal->hal_count,
-                                               status);
+                       /* TODO: narrow down the HSM action range that already
+                        * scanned accroding to the cookies when a failure
+                        * occurs.
+                        */
+                       if (rc) {
+                               hsd.hsd_start_cat_idx = start_cat_idx;
+                               hsd.hsd_start_rec_idx = start_rec_idx;
+                       }
+               }
+
+               if (update_idx) {
+                       rc = mdt_agent_record_update(mti, updates, update_idx);
                        if (rc)
                                CERROR("%s: mdt_agent_record_update() failed, "
-                                      "rc=%d, cannot update status to %s "
+                                      "rc=%d, cannot update records "
                                       "for %d cookies\n",
-                                      mdt_obd_name(mdt), rc,
-                                      agent_req_status2name(status),
-                                      hsd.request[i].hal->hal_count);
-
-                       OBD_FREE(cookies, sz);
-                       kuc_free(hal, hsd.request[i].hal_used_sz);
-               }
-clean_cb_alloc:
-               /* free cookie vector allocated for/by callback */
-               if (hsd.cookies) {
-                       OBD_FREE(hsd.cookies, hsd.max_cookie * sizeof(__u64));
-                       hsd.max_cookie = 0;
-                       hsd.cookie_cnt = 0;
-                       hsd.cookies = NULL;
+                                      mdt_obd_name(mdt), rc, update_idx);
                }
 
+               OBD_FREE_LARGE(updates, updates_sz);
+
+clean_cb_alloc:
                /* free hal allocated by callback */
-               for (i = 0; i < hsd.max_requests; i++) {
-                       if (hsd.request[i].hal) {
-                               OBD_FREE(hsd.request[i].hal,
-                                        hsd.request[i].hal_sz);
-                               hsd.request[i].hal_sz = 0;
-                               hsd.request[i].hal = NULL;
-                               hsd.request_cnt--;
-                       }
-               }
-               LASSERT(hsd.request_cnt == 0);
+               for (i = 0; i < hsd.hsd_request_count; i++) {
+                       struct hsm_scan_request *request = &hsd.hsd_request[i];
 
-               /* reset callback data */
-               memset(hsd.request, 0, hsd.request_sz);
+                       OBD_FREE_LARGE(request->hal, request->hal_sz);
+               }
        }
-       EXIT;
-out:
-       if (hsd.request)
-               OBD_FREE(hsd.request, hsd.request_sz);
 
-       if (hsd.cookies)
-               OBD_FREE(hsd.cookies, hsd.max_cookie * sizeof(__u64));
+       if (hsd.hsd_request != NULL)
+               OBD_FREE_LARGE(hsd.hsd_request, request_sz);
 
-       if (cdt->cdt_state == CDT_STOPPING) {
-               /* request comes from /proc path, so we need to clean cdt
-                * struct */
-                mdt_hsm_cdt_stop(mdt);
-                mdt->mdt_opts.mo_coordinator = 0;
-       } else {
-               /* request comes from a thread event, generated
-                * by mdt_stop_coordinator(), we have to ack
-                * and cdt cleaning will be done by event sender
-                */
-               cdt->cdt_thread.t_flags = SVC_STOPPED;
-               wake_up(&cdt->cdt_thread.t_ctl_waitq);
-       }
+       mdt_hsm_cdt_cleanup(mdt);
 
        if (rc != 0)
                CERROR("%s: coordinator thread exiting, process=%d, rc=%d\n",
-                      mdt_obd_name(mdt), current_pid(), rc);
+                      mdt_obd_name(mdt), current->pid, rc);
        else
                CDEBUG(D_HSM, "%s: coordinator thread exiting, process=%d,"
                              " no error\n",
-                      mdt_obd_name(mdt), current_pid());
+                      mdt_obd_name(mdt), current->pid);
+
+       RETURN(rc);
+}
+
+/**
+ * register a new HSM restore handle for a file and take EX lock on the layout
+ * \param mti [IN] thread info
+ * \param cdt [IN] coordinator
+ * \param fid [IN] fid of the file to restore
+ * \param he  [IN] HSM extent
+ * \retval 0 success
+ * \retval 1 restore handle already exists for the fid
+ * \retval -ve failure
+ */
+int cdt_restore_handle_add(struct mdt_thread_info *mti, struct coordinator *cdt,
+                          const struct lu_fid *fid,
+                          const struct hsm_extent *he)
+{
+       struct mdt_lock_handle lh = { 0 };
+       struct cdt_restore_handle *crh;
+       struct mdt_object *obj;
+       int rc;
+       ENTRY;
+
+       OBD_SLAB_ALLOC_PTR(crh, mdt_hsm_cdt_kmem);
+       if (crh == NULL)
+               RETURN(-ENOMEM);
+
+       crh->crh_fid = *fid;
+       /* in V1 all file is restored
+        * crh->extent.start = he->offset;
+        * crh->extent.end = he->offset + he->length;
+        */
+       crh->crh_extent.start = 0;
+       crh->crh_extent.end = he->length;
+       crh->crh_lh.mlh_type = MDT_NUL_LOCK;
+
+       mutex_lock(&cdt->cdt_restore_lock);
+       if (cdt_restore_handle_find(cdt, fid) != NULL)
+               GOTO(out_crl, rc = 1);
+
+       if (unlikely(cdt->cdt_state == CDT_STOPPED ||
+                    cdt->cdt_state == CDT_STOPPING))
+               GOTO(out_crl, rc = -EAGAIN);
+
+       list_add_tail(&crh->crh_list, &cdt->cdt_restore_handle_list);
+       mutex_unlock(&cdt->cdt_restore_lock);
+
+       /* get the layout lock */
+       mdt_lock_reg_init(&lh, LCK_EX);
+       obj = mdt_object_find_lock(mti, &crh->crh_fid, &lh,
+                                  MDS_INODELOCK_LAYOUT);
+       if (IS_ERR(obj)) {
+               mutex_lock(&cdt->cdt_restore_lock);
+               GOTO(out_ldel, rc = PTR_ERR(obj));
+       }
+
+       /* We do not keep a reference on the object during the restore
+        * which can be very long.
+        */
+       mdt_object_put(mti->mti_env, obj);
+
+       mutex_lock(&cdt->cdt_restore_lock);
+       if (unlikely(cdt->cdt_state == CDT_STOPPED ||
+                    cdt->cdt_state == CDT_STOPPING))
+               GOTO(out_lh, rc = -EAGAIN);
+
+       crh->crh_lh = lh;
+       mutex_unlock(&cdt->cdt_restore_lock);
+
+       RETURN(0);
+out_lh:
+       mdt_object_unlock(mti, NULL, &crh->crh_lh, 1);
+out_ldel:
+       list_del(&crh->crh_list);
+out_crl:
+       mutex_unlock(&cdt->cdt_restore_lock);
+       OBD_SLAB_FREE_PTR(crh, mdt_hsm_cdt_kmem);
 
        return rc;
 }
@@ -704,19 +890,41 @@ out:
  * \retval cdt_restore_handle found
  * \retval NULL not found
  */
-static struct cdt_restore_handle *hsm_restore_hdl_find(struct coordinator *cdt,
-                                                      const struct lu_fid *fid)
+struct cdt_restore_handle *cdt_restore_handle_find(struct coordinator *cdt,
+                                                  const struct lu_fid *fid)
 {
-       struct cdt_restore_handle       *crh;
+       struct cdt_restore_handle *crh;
        ENTRY;
 
-       list_for_each_entry(crh, &cdt->cdt_restore_hdl, crh_list) {
+       list_for_each_entry(crh, &cdt->cdt_restore_handle_list, crh_list) {
                if (lu_fid_eq(&crh->crh_fid, fid))
                        RETURN(crh);
        }
+
        RETURN(NULL);
 }
 
+void cdt_restore_handle_del(struct mdt_thread_info *mti,
+                           struct coordinator *cdt, const struct lu_fid *fid)
+{
+       struct cdt_restore_handle *crh;
+
+       /* give back layout lock */
+       mutex_lock(&cdt->cdt_restore_lock);
+       crh = cdt_restore_handle_find(cdt, fid);
+       if (crh != NULL)
+               list_del(&crh->crh_list);
+       mutex_unlock(&cdt->cdt_restore_lock);
+
+       if (crh == NULL)
+               return;
+
+       /* XXX We pass a NULL object since the restore handle does not
+        * keep a reference on the object being restored. */
+       mdt_object_unlock(mti, NULL, &crh->crh_lh, 1);
+       OBD_SLAB_FREE_PTR(crh, mdt_hsm_cdt_kmem);
+}
+
 /**
  * data passed to llog_cat_process() callback
  * to scan requests and take actions
@@ -741,11 +949,9 @@ static int hsm_restore_cb(const struct lu_env *env,
 {
        struct llog_agent_req_rec       *larr;
        struct hsm_restore_data         *hrd;
-       struct cdt_restore_handle       *crh;
        struct hsm_action_item          *hai;
        struct mdt_thread_info          *mti;
        struct coordinator              *cdt;
-       struct mdt_object               *child;
        int rc;
        ENTRY;
 
@@ -755,9 +961,10 @@ static int hsm_restore_cb(const struct lu_env *env,
 
        larr = (struct llog_agent_req_rec *)hdr;
        hai = &larr->arr_hai;
-       if (hai->hai_cookie > cdt->cdt_last_cookie)
+       if (hai->hai_cookie >= cdt->cdt_last_cookie) {
                /* update the cookie to avoid collision */
                cdt->cdt_last_cookie = hai->hai_cookie + 1;
+       }
 
        if (hai->hai_action != HSMA_RESTORE ||
            agent_req_in_final_state(larr->arr_status))
@@ -765,33 +972,20 @@ static int hsm_restore_cb(const struct lu_env *env,
 
        /* restore request not in a final state */
 
-       OBD_SLAB_ALLOC_PTR(crh, mdt_hsm_cdt_kmem);
-       if (crh == NULL)
-               RETURN(-ENOMEM);
-
-       crh->crh_fid = hai->hai_fid;
-       /* in V1 all file is restored
-       crh->extent.start = hai->hai_extent.offset;
-       crh->extent.end = hai->hai_extent.offset + hai->hai_extent.length;
-       */
-       crh->crh_extent.start = 0;
-       crh->crh_extent.end = hai->hai_extent.length;
-       /* get the layout lock */
-       mdt_lock_reg_init(&crh->crh_lh, LCK_EX);
-       child = mdt_object_find_lock(mti, &crh->crh_fid, &crh->crh_lh,
-                                    MDS_INODELOCK_LAYOUT);
-       if (IS_ERR(child))
-               GOTO(out, rc = PTR_ERR(child));
-
-       rc = 0;
-       /* we choose to not keep a reference
-        * on the object during the restore time which can be very long */
-       mdt_object_put(mti->mti_env, child);
-
-       mutex_lock(&cdt->cdt_restore_lock);
-       list_add_tail(&crh->crh_list, &cdt->cdt_restore_hdl);
-       mutex_unlock(&cdt->cdt_restore_lock);
+       /* force replay of restore requests left in started state from previous
+        * CDT context, to be canceled later if finally found to be incompatible
+        * when being re-started */
+       if (larr->arr_status == ARS_STARTED) {
+               larr->arr_status = ARS_WAITING;
+               larr->arr_req_change = ktime_get_real_seconds();
+               rc = llog_write(env, llh, hdr, hdr->lrh_index);
+               if (rc != 0)
+                       GOTO(out, rc);
+       }
 
+       rc = cdt_restore_handle_add(mti, cdt, &hai->hai_fid, &hai->hai_extent);
+       if (rc == 1)
+               rc = 0;
 out:
        RETURN(rc);
 }
@@ -809,16 +1003,15 @@ static int mdt_hsm_pending_restore(struct mdt_thread_info *mti)
 
        hrd.hrd_mti = mti;
 
-       rc = cdt_llog_process(mti->mti_env, mti->mti_mdt,
-                             hsm_restore_cb, &hrd);
+       rc = cdt_llog_process(mti->mti_env, mti->mti_mdt, hsm_restore_cb, &hrd,
+                             0, 0, WRITE);
 
        RETURN(rc);
 }
 
-static int hsm_init_ucred(struct lu_ucred *uc)
+int hsm_init_ucred(struct lu_ucred *uc)
 {
        ENTRY;
-
        uc->uc_valid = UCRED_OLD;
        uc->uc_o_uid = 0;
        uc->uc_o_gid = 0;
@@ -830,31 +1023,12 @@ static int hsm_init_ucred(struct lu_ucred *uc)
        uc->uc_fsgid = 0;
        uc->uc_suppgids[0] = -1;
        uc->uc_suppgids[1] = -1;
-       uc->uc_cap = CFS_CAP_FS_MASK;
+       uc->uc_cap = cap_combine(CAP_FS_SET, CAP_NFSD_SET);
        uc->uc_umask = 0777;
        uc->uc_ginfo = NULL;
        uc->uc_identity = NULL;
-
-       RETURN(0);
-}
-
-/**
- * wake up coordinator thread
- * \param mdt [IN] device
- * \retval 0 success
- * \retval -ve failure
- */
-int mdt_hsm_cdt_wakeup(struct mdt_device *mdt)
-{
-       struct coordinator      *cdt = &mdt->mdt_coordinator;
-       ENTRY;
-
-       if (cdt->cdt_state == CDT_STOPPED)
-               RETURN(-ESRCH);
-
-       /* wake up coordinator */
-       cdt->cdt_thread.t_flags = SVC_EVENT;
-       wake_up(&cdt->cdt_thread.t_ctl_waitq);
+       /* always record internal HSM activity if also enabled globally */
+       uc->uc_enable_audit = 1;
 
        RETURN(0);
 }
@@ -872,31 +1046,53 @@ int mdt_hsm_cdt_init(struct mdt_device *mdt)
        int                      rc;
        ENTRY;
 
-       cdt->cdt_state = CDT_STOPPED;
-
-       init_waitqueue_head(&cdt->cdt_thread.t_ctl_waitq);
-       mutex_init(&cdt->cdt_llog_lock);
+       init_waitqueue_head(&cdt->cdt_waitq);
+       init_rwsem(&cdt->cdt_llog_lock);
        init_rwsem(&cdt->cdt_agent_lock);
        init_rwsem(&cdt->cdt_request_lock);
        mutex_init(&cdt->cdt_restore_lock);
+       mutex_init(&cdt->cdt_state_lock);
+       set_cdt_state(cdt, CDT_STOPPED);
 
-       INIT_LIST_HEAD(&cdt->cdt_requests);
+       INIT_LIST_HEAD(&cdt->cdt_request_list);
        INIT_LIST_HEAD(&cdt->cdt_agents);
-       INIT_LIST_HEAD(&cdt->cdt_restore_hdl);
+       INIT_LIST_HEAD(&cdt->cdt_restore_handle_list);
+
+       cdt->cdt_request_cookie_hash = cfs_hash_create("REQUEST_COOKIE_HASH",
+                                                      CFS_HASH_BITS_MIN,
+                                                      CFS_HASH_BITS_MAX,
+                                                      CFS_HASH_BKT_BITS,
+                                                      0 /* extra bytes */,
+                                                      CFS_HASH_MIN_THETA,
+                                                      CFS_HASH_MAX_THETA,
+                                               &cdt_request_cookie_hash_ops,
+                                                      CFS_HASH_DEFAULT);
+       if (cdt->cdt_request_cookie_hash == NULL)
+               RETURN(-ENOMEM);
+
+       cdt->cdt_agent_record_hash = cfs_hash_create("AGENT_RECORD_HASH",
+                                                    CFS_HASH_BITS_MIN,
+                                                    CFS_HASH_BITS_MAX,
+                                                    CFS_HASH_BKT_BITS,
+                                                    0 /* extra bytes */,
+                                                    CFS_HASH_MIN_THETA,
+                                                    CFS_HASH_MAX_THETA,
+                                                    &cdt_agent_record_hash_ops,
+                                                    CFS_HASH_DEFAULT);
+       if (cdt->cdt_agent_record_hash == NULL)
+               GOTO(out_request_cookie_hash, rc = -ENOMEM);
 
        rc = lu_env_init(&cdt->cdt_env, LCT_MD_THREAD);
        if (rc < 0)
-               RETURN(rc);
+               GOTO(out_agent_record_hash, rc);
 
        /* for mdt_ucred(), lu_ucred stored in lu_ucred_key */
        rc = lu_context_init(&cdt->cdt_session, LCT_SERVER_SESSION);
-       if (rc == 0) {
-               lu_context_enter(&cdt->cdt_session);
-               cdt->cdt_env.le_ses = &cdt->cdt_session;
-       } else {
-               lu_env_fini(&cdt->cdt_env);
-               RETURN(rc);
-       }
+       if (rc < 0)
+               GOTO(out_env, rc);
+
+       lu_context_enter(&cdt->cdt_session);
+       cdt->cdt_env.le_ses = &cdt->cdt_session;
 
        cdt_mti = lu_context_key_get(&cdt->cdt_env.le_ctx, &mdt_thread_key);
        LASSERT(cdt_mti != NULL);
@@ -906,7 +1102,7 @@ int mdt_hsm_cdt_init(struct mdt_device *mdt)
 
        hsm_init_ucred(mdt_ucred(cdt_mti));
 
-       /* default values for /proc tunnables
+       /* default values for sysfs tunnables
         * can be override by MGS conf */
        cdt->cdt_default_archive_id = 1;
        cdt->cdt_grace_delay = 60;
@@ -915,7 +1111,21 @@ int mdt_hsm_cdt_init(struct mdt_device *mdt)
        cdt->cdt_policy = CDT_DEFAULT_POLICY;
        cdt->cdt_active_req_timeout = 3600;
 
+       /* by default do not remove archives on last unlink */
+       cdt->cdt_remove_archive_on_last_unlink = false;
+
        RETURN(0);
+
+out_env:
+       lu_env_fini(&cdt->cdt_env);
+out_agent_record_hash:
+       cfs_hash_putref(cdt->cdt_agent_record_hash);
+       cdt->cdt_agent_record_hash = NULL;
+out_request_cookie_hash:
+       cfs_hash_putref(cdt->cdt_request_cookie_hash);
+       cdt->cdt_request_cookie_hash = NULL;
+
+       return rc;
 }
 
 /**
@@ -932,6 +1142,12 @@ int  mdt_hsm_cdt_fini(struct mdt_device *mdt)
 
        lu_env_fini(&cdt->cdt_env);
 
+       cfs_hash_putref(cdt->cdt_agent_record_hash);
+       cdt->cdt_agent_record_hash = NULL;
+
+       cfs_hash_putref(cdt->cdt_request_cookie_hash);
+       cdt->cdt_request_cookie_hash = NULL;
+
        RETURN(0);
 }
 
@@ -941,13 +1157,13 @@ int  mdt_hsm_cdt_fini(struct mdt_device *mdt)
  * \retval 0 success
  * \retval -ve failure
  */
-int mdt_hsm_cdt_start(struct mdt_device *mdt)
+static int mdt_hsm_cdt_start(struct mdt_device *mdt)
 {
-       struct coordinator      *cdt = &mdt->mdt_coordinator;
-       int                      rc;
-       void                    *ptr;
-       struct mdt_thread_info  *cdt_mti;
-       struct task_struct      *task;
+       struct coordinator *cdt = &mdt->mdt_coordinator;
+       struct mdt_thread_info *cdt_mti;
+       int rc;
+       void *ptr;
+       struct task_struct *task;
        ENTRY;
 
        /* functions defined but not yet used
@@ -955,56 +1171,50 @@ int mdt_hsm_cdt_start(struct mdt_device *mdt)
         */
        ptr = dump_requests;
 
-       if (cdt->cdt_state != CDT_STOPPED) {
-               CERROR("%s: Coordinator already started\n",
+       rc = set_cdt_state(cdt, CDT_INIT);
+       if (rc) {
+               CERROR("%s: Coordinator already started or stopping\n",
                       mdt_obd_name(mdt));
                RETURN(-EALREADY);
        }
 
-       CLASSERT(1 << (CDT_POLICY_SHIFT_COUNT - 1) == CDT_POLICY_LAST);
+       BUILD_BUG_ON(BIT(CDT_POLICY_SHIFT_COUNT - 1) != CDT_POLICY_LAST);
        cdt->cdt_policy = CDT_DEFAULT_POLICY;
 
-       cdt->cdt_state = CDT_INIT;
-
-       atomic_set(&cdt->cdt_compound_id, cfs_time_current_sec());
        /* just need to be larger than previous one */
        /* cdt_last_cookie is protected by cdt_llog_lock */
-       cdt->cdt_last_cookie = cfs_time_current_sec();
+       cdt->cdt_last_cookie = ktime_get_real_seconds();
        atomic_set(&cdt->cdt_request_count, 0);
+       atomic_set(&cdt->cdt_archive_count, 0);
+       atomic_set(&cdt->cdt_restore_count, 0);
+       atomic_set(&cdt->cdt_remove_count, 0);
        cdt->cdt_user_request_mask = (1UL << HSMA_RESTORE);
        cdt->cdt_group_request_mask = (1UL << HSMA_RESTORE);
        cdt->cdt_other_request_mask = (1UL << HSMA_RESTORE);
 
-       /* to avoid deadlock when start is made through /proc
-        * /proc entries are created by the coordinator thread */
+       /* to avoid deadlock when start is made through sysfs
+        * sysfs entries are created by the coordinator thread
+        */
+       if (mdt->mdt_bottom->dd_rdonly)
+               RETURN(0);
 
-       /* set up list of started restore requests */
        cdt_mti = lu_context_key_get(&cdt->cdt_env.le_ctx, &mdt_thread_key);
-       rc = mdt_hsm_pending_restore(cdt_mti);
-       if (rc)
-               CERROR("%s: cannot take the layout locks needed"
-                      " for registered restore: %d\n",
-                      mdt_obd_name(mdt), rc);
-
        task = kthread_run(mdt_coordinator, cdt_mti, "hsm_cdtr");
        if (IS_ERR(task)) {
                rc = PTR_ERR(task);
-               cdt->cdt_state = CDT_STOPPED;
+               set_cdt_state(cdt, CDT_STOPPED);
                CERROR("%s: error starting coordinator thread: %d\n",
                       mdt_obd_name(mdt), rc);
-               RETURN(rc);
        } else {
+               cdt->cdt_task = task;
+               wait_event(cdt->cdt_waitq,
+                          cdt->cdt_state != CDT_INIT);
                CDEBUG(D_HSM, "%s: coordinator thread started\n",
                       mdt_obd_name(mdt));
                rc = 0;
        }
 
-       wait_event(cdt->cdt_thread.t_ctl_waitq,
-                      (cdt->cdt_thread.t_flags & SVC_RUNNING));
-
-       cdt->cdt_state = CDT_RUNNING;
-       mdt->mdt_opts.mo_coordinator = 1;
-       RETURN(0);
+       RETURN(rc);
 }
 
 /**
@@ -1013,63 +1223,45 @@ int mdt_hsm_cdt_start(struct mdt_device *mdt)
  */
 int mdt_hsm_cdt_stop(struct mdt_device *mdt)
 {
-       struct coordinator              *cdt = &mdt->mdt_coordinator;
-       struct cdt_agent_req            *car, *tmp1;
-       struct hsm_agent                *ha, *tmp2;
-       struct cdt_restore_handle       *crh, *tmp3;
-       struct mdt_thread_info          *cdt_mti;
-       ENTRY;
-
-       if (cdt->cdt_state == CDT_STOPPED) {
-               CERROR("%s: Coordinator already stopped\n",
-                      mdt_obd_name(mdt));
-               RETURN(-EALREADY);
-       }
-
-       if (cdt->cdt_state != CDT_STOPPING) {
-               /* stop coordinator thread before cleaning */
-               cdt->cdt_thread.t_flags = SVC_STOPPING;
-               wake_up(&cdt->cdt_thread.t_ctl_waitq);
-               wait_event(cdt->cdt_thread.t_ctl_waitq,
-                          cdt->cdt_thread.t_flags & SVC_STOPPED);
-       }
-       cdt->cdt_state = CDT_STOPPED;
-
-       /* start cleaning */
-       down_write(&cdt->cdt_request_lock);
-       list_for_each_entry_safe(car, tmp1, &cdt->cdt_requests,
-                                car_request_list) {
-               list_del(&car->car_request_list);
-               mdt_cdt_free_request(car);
-       }
-       up_write(&cdt->cdt_request_lock);
+       struct coordinator *cdt = &mdt->mdt_coordinator;
+       int rc;
 
-       down_write(&cdt->cdt_agent_lock);
-       list_for_each_entry_safe(ha, tmp2, &cdt->cdt_agents, ha_list) {
-               list_del(&ha->ha_list);
-               OBD_FREE_PTR(ha);
+       ENTRY;
+       /* stop coordinator thread */
+       rc = set_cdt_state(cdt, CDT_STOPPING);
+       if (rc == 0) {
+               kthread_stop(cdt->cdt_task);
+               cdt->cdt_task = NULL;
+               set_cdt_state(cdt, CDT_STOPPED);
        }
-       up_write(&cdt->cdt_agent_lock);
 
-       cdt_mti = lu_context_key_get(&cdt->cdt_env.le_ctx, &mdt_thread_key);
-       mutex_lock(&cdt->cdt_restore_lock);
-       list_for_each_entry_safe(crh, tmp3, &cdt->cdt_restore_hdl, crh_list) {
-               struct mdt_object       *child;
+       RETURN(rc);
+}
 
-               /* give back layout lock */
-               child = mdt_object_find(&cdt->cdt_env, mdt, &crh->crh_fid);
-               if (!IS_ERR(child))
-                       mdt_object_unlock_put(cdt_mti, child, &crh->crh_lh, 1);
+static int mdt_hsm_set_exists(struct mdt_thread_info *mti,
+                             const struct lu_fid *fid,
+                             u32 archive_id)
+{
+       struct mdt_object *obj;
+       struct md_hsm mh;
+       int rc;
 
-               list_del(&crh->crh_list);
+       obj = mdt_hsm_get_md_hsm(mti, fid, &mh);
+       if (IS_ERR(obj))
+               GOTO(out, rc = PTR_ERR(obj));
 
-               OBD_SLAB_FREE_PTR(crh, mdt_hsm_cdt_kmem);
-       }
-       mutex_unlock(&cdt->cdt_restore_lock);
+       if (mh.mh_flags & HS_EXISTS &&
+           mh.mh_arch_id == archive_id)
+               GOTO(out_obj, rc = 0);
 
-       mdt->mdt_opts.mo_coordinator = 0;
+       mh.mh_flags |= HS_EXISTS;
+       mh.mh_arch_id = archive_id;
+       rc = mdt_hsm_attr_set(mti, obj, &mh);
 
-       RETURN(0);
+out_obj:
+       mdt_object_put(mti->mti_env, obj);
+out:
+       return rc;
 }
 
 /**
@@ -1101,13 +1293,16 @@ int mdt_hsm_add_hal(struct mdt_thread_info *mti,
                 * it will be done when updating the request status
                 */
                if (hai->hai_action == HSMA_CANCEL) {
-                       rc = mdt_agent_record_update(mti->mti_env, mti->mti_mdt,
-                                                    &hai->hai_cookie,
-                                                    1, ARS_CANCELED);
+                       struct hsm_record_update update = {
+                               .cookie = hai->hai_cookie,
+                               .status = ARS_CANCELED,
+                       };
+
+                       rc = mdt_agent_record_update(mti, &update, 1);
                        if (rc) {
                                CERROR("%s: mdt_agent_record_update() failed, "
                                       "rc=%d, cannot update status to %s "
-                                      "for cookie "LPX64"\n",
+                                      "for cookie %#llx\n",
                                       mdt_obd_name(mdt), rc,
                                       agent_req_status2name(ARS_CANCELED),
                                       hai->hai_cookie);
@@ -1115,7 +1310,7 @@ int mdt_hsm_add_hal(struct mdt_thread_info *mti,
                        }
 
                        /* find the running request to set it canceled */
-                       car = mdt_cdt_find_request(cdt, hai->hai_cookie, NULL);
+                       car = mdt_cdt_find_request(cdt, hai->hai_cookie);
                        if (car != NULL) {
                                car->car_canceled = 1;
                                /* uuid has to be changed to the one running the
@@ -1131,25 +1326,15 @@ int mdt_hsm_add_hal(struct mdt_thread_info *mti,
                }
 
                if (hai->hai_action == HSMA_ARCHIVE) {
-                       struct mdt_object *obj;
-                       struct md_hsm hsm;
-
-                       obj = mdt_hsm_get_md_hsm(mti, &hai->hai_fid, &hsm);
-                       if (IS_ERR(obj) && (PTR_ERR(obj) == -ENOENT))
+                       rc = mdt_hsm_set_exists(mti, &hai->hai_fid,
+                                               hal->hal_archive_id);
+                       if (rc == -ENOENT)
                                continue;
-                       if (IS_ERR(obj))
-                               GOTO(out, rc = PTR_ERR(obj));
-
-                       hsm.mh_flags |= HS_EXISTS;
-                       hsm.mh_arch_id = hal->hal_archive_id;
-                       rc = mdt_hsm_attr_set(mti, obj, &hsm);
-                       mdt_object_put(mti->mti_env, obj);
-                       if (rc)
+                       else if (rc < 0)
                                GOTO(out, rc);
                }
 
-               car = mdt_cdt_alloc_request(hal->hal_compound_id,
-                                           hal->hal_archive_id, hal->hal_flags,
+               car = mdt_cdt_alloc_request(hal->hal_archive_id, hal->hal_flags,
                                            uuid, hai);
                if (IS_ERR(car))
                        GOTO(out, rc = PTR_ERR(car));
@@ -1165,39 +1350,37 @@ out:
 /**
  * swap layouts between 2 fids
  * \param mti [IN] context
- * \param fid1 [IN]
- * \param fid2 [IN]
+ * \param obj [IN]
+ * \param dfid [IN]
  * \param mh_common [IN] MD HSM
  */
 static int hsm_swap_layouts(struct mdt_thread_info *mti,
-                           const lustre_fid *fid, const lustre_fid *dfid,
+                           struct mdt_object *obj, const struct lu_fid *dfid,
                            struct md_hsm *mh_common)
 {
-       struct mdt_device       *mdt = mti->mti_mdt;
-       struct mdt_object       *child1, *child2;
-       struct mdt_lock_handle  *lh2;
+       struct mdt_object       *dobj;
+       struct mdt_lock_handle  *dlh;
        int                      rc;
        ENTRY;
 
-       child1 = mdt_object_find(mti->mti_env, mdt, fid);
-       if (IS_ERR(child1))
-               GOTO(out, rc = PTR_ERR(child1));
+       if (!mdt_object_exists(obj))
+               GOTO(out, rc = -ENOENT);
 
-       /* we already have layout lock on FID so take only
+       /* we already have layout lock on obj so take only
         * on dfid */
-       lh2 = &mti->mti_lh[MDT_LH_OLD];
-       mdt_lock_reg_init(lh2, LCK_EX);
-       child2 = mdt_object_find_lock(mti, dfid, lh2, MDS_INODELOCK_LAYOUT);
-       if (IS_ERR(child2))
-               GOTO(out_child1, rc = PTR_ERR(child2));
+       dlh = &mti->mti_lh[MDT_LH_OLD];
+       mdt_lock_reg_init(dlh, LCK_EX);
+       dobj = mdt_object_find_lock(mti, dfid, dlh, MDS_INODELOCK_LAYOUT);
+       if (IS_ERR(dobj))
+               GOTO(out, rc = PTR_ERR(dobj));
 
        /* if copy tool closes the volatile before sending the final
         * progress through llapi_hsm_copy_end(), all the objects
         * are removed and mdd_swap_layout LBUG */
-       if (!mdt_object_exists(child2)) {
+       if (!mdt_object_exists(dobj)) {
                CERROR("%s: Copytool has closed volatile file "DFID"\n",
                       mdt_obd_name(mti->mti_mdt), PFID(dfid));
-               GOTO(out_child2, rc = -ENOENT);
+               GOTO(out_dobj, rc = -ENOENT);
        }
        /* Since we only handle restores here, unconditionally use
         * SWAP_LAYOUTS_MDS_HSM flag to ensure original layout will
@@ -1208,17 +1391,23 @@ static int hsm_swap_layouts(struct mdt_thread_info *mti,
         * only need to clear RELEASED and DIRTY.
         */
        mh_common->mh_flags &= ~(HS_RELEASED | HS_DIRTY);
-       rc = mdt_hsm_attr_set(mti, child2, mh_common);
+       rc = mdt_hsm_attr_set(mti, dobj, mh_common);
        if (rc == 0)
                rc = mo_swap_layouts(mti->mti_env,
-                                    mdt_object_child(child1),
-                                    mdt_object_child(child2),
+                                    mdt_object_child(obj),
+                                    mdt_object_child(dobj),
                                     SWAP_LAYOUTS_MDS_HSM);
-
-out_child2:
-       mdt_object_unlock_put(mti, child2, lh2, 1);
-out_child1:
-       mdt_object_put(mti->mti_env, child1);
+       if (rc == 0) {
+               rc = mdt_lsom_downgrade(mti, obj);
+               if (rc)
+                       CDEBUG(D_INODE,
+                              "%s: File fid="DFID" SOM "
+                              "downgrade failed, rc = %d\n",
+                              mdt_obd_name(mti->mti_mdt),
+                              PFID(mdt_object_fid(obj)), rc);
+       }
+out_dobj:
+       mdt_object_unlock_put(mti, dobj, dlh, 1);
 out:
        RETURN(rc);
 }
@@ -1227,7 +1416,6 @@ out:
  * update status of a completed request
  * \param mti [IN] context
  * \param pgs [IN] progress of the copy tool
- * \param update_record [IN] update llog record
  * \retval 0 success
  * \retval -ve failure
  */
@@ -1236,27 +1424,25 @@ static int hsm_cdt_request_completed(struct mdt_thread_info *mti,
                                     const struct cdt_agent_req *car,
                                     enum agent_req_status *status)
 {
-       const struct lu_env     *env = mti->mti_env;
-       struct mdt_device       *mdt = mti->mti_mdt;
-       struct coordinator      *cdt = &mdt->mdt_coordinator;
-       struct mdt_object       *obj = NULL;
-       int                      cl_flags = 0, rc = 0;
-       struct md_hsm            mh;
-       bool                     is_mh_changed;
-       ENTRY;
+       const struct lu_env *env = mti->mti_env;
+       struct mdt_device *mdt = mti->mti_mdt;
+       struct coordinator *cdt = &mdt->mdt_coordinator;
+       struct mdt_object *obj = NULL;
+       enum changelog_rec_flags clf_flags = 0;
+       struct md_hsm mh;
+       bool is_mh_changed;
+       bool need_changelog = true;
+       int rc = 0;
 
+       ENTRY;
        /* default is to retry */
        *status = ARS_WAITING;
 
-       /* find object by FID */
+       /* find object by FID, mdt_hsm_get_md_hsm() returns obj or err
+        * if error/removed continue anyway to get correct reporting done */
        obj = mdt_hsm_get_md_hsm(mti, &car->car_hai->hai_fid, &mh);
        /* we will update MD HSM only if needed */
        is_mh_changed = false;
-       if (IS_ERR(obj)) {
-               /* object removed */
-               *status = ARS_SUCCEED;
-               goto unlock;
-       }
 
        /* no need to change mh->mh_arch_id
         * mdt_hsm_get_md_hsm() got it from disk and it is still valid
@@ -1284,45 +1470,46 @@ static int hsm_cdt_request_completed(struct mdt_thread_info *mti,
                        *status = ARS_SUCCEED;
                        break;
                default:
+                       /* retry only if current policy or requested, and
+                        * object is not on error/removed */
                        *status = (cdt->cdt_policy & CDT_NORETRY_ACTION ||
-                                  !(pgs->hpk_flags & HP_FLAG_RETRY) ?
-                                  ARS_FAILED : ARS_WAITING);
+                                  !(pgs->hpk_flags & HP_FLAG_RETRY) ||
+                                  IS_ERR(obj)) ? ARS_FAILED : ARS_WAITING;
                        break;
                }
 
                if (pgs->hpk_errval > CLF_HSM_MAXERROR) {
-                       CERROR("%s: Request "LPX64" on "DFID
+                       CERROR("%s: Request %#llx on "DFID
                               " failed, error code %d too large\n",
                               mdt_obd_name(mdt),
                               pgs->hpk_cookie, PFID(&pgs->hpk_fid),
                               pgs->hpk_errval);
-                       hsm_set_cl_error(&cl_flags,
-                                        CLF_HSM_ERROVERFLOW);
+                       hsm_set_cl_error(&clf_flags, CLF_HSM_ERROVERFLOW);
                        rc = -EINVAL;
                } else {
-                       hsm_set_cl_error(&cl_flags, pgs->hpk_errval);
+                       hsm_set_cl_error(&clf_flags, pgs->hpk_errval);
                }
 
                switch (car->car_hai->hai_action) {
                case HSMA_ARCHIVE:
-                       hsm_set_cl_event(&cl_flags, HE_ARCHIVE);
+                       hsm_set_cl_event(&clf_flags, HE_ARCHIVE);
                        break;
                case HSMA_RESTORE:
-                       hsm_set_cl_event(&cl_flags, HE_RESTORE);
+                       hsm_set_cl_event(&clf_flags, HE_RESTORE);
                        break;
                case HSMA_REMOVE:
-                       hsm_set_cl_event(&cl_flags, HE_REMOVE);
+                       hsm_set_cl_event(&clf_flags, HE_REMOVE);
                        break;
                case HSMA_CANCEL:
-                       hsm_set_cl_event(&cl_flags, HE_CANCEL);
-                       CERROR("%s: Failed request "LPX64" on "DFID
+                       hsm_set_cl_event(&clf_flags, HE_CANCEL);
+                       CERROR("%s: Failed request %#llx on "DFID
                               " cannot be a CANCEL\n",
                               mdt_obd_name(mdt),
                               pgs->hpk_cookie,
                               PFID(&pgs->hpk_fid));
                        break;
                default:
-                       CERROR("%s: Failed request "LPX64" on "DFID
+                       CERROR("%s: Failed request %#llx on "DFID
                               " %d is an unknown action\n",
                               mdt_obd_name(mdt),
                               pgs->hpk_cookie, PFID(&pgs->hpk_fid),
@@ -1334,7 +1521,7 @@ static int hsm_cdt_request_completed(struct mdt_thread_info *mti,
                *status = ARS_SUCCEED;
                switch (car->car_hai->hai_action) {
                case HSMA_ARCHIVE:
-                       hsm_set_cl_event(&cl_flags, HE_ARCHIVE);
+                       hsm_set_cl_event(&clf_flags, HE_ARCHIVE);
                        /* set ARCHIVE keep EXIST and clear LOST and
                         * DIRTY */
                        mh.mh_arch_ver = pgs->hpk_data_version;
@@ -1343,7 +1530,7 @@ static int hsm_cdt_request_completed(struct mdt_thread_info *mti,
                        is_mh_changed = true;
                        break;
                case HSMA_RESTORE:
-                       hsm_set_cl_event(&cl_flags, HE_RESTORE);
+                       hsm_set_cl_event(&clf_flags, HE_RESTORE);
 
                        /* do not clear RELEASED and DIRTY here
                         * this will occur in hsm_swap_layouts()
@@ -1355,24 +1542,20 @@ static int hsm_cdt_request_completed(struct mdt_thread_info *mti,
                        is_mh_changed = true;
                        break;
                case HSMA_REMOVE:
-                       hsm_set_cl_event(&cl_flags, HE_REMOVE);
+                       hsm_set_cl_event(&clf_flags, HE_REMOVE);
                        /* clear ARCHIVED EXISTS and LOST */
                        mh.mh_flags &= ~(HS_ARCHIVED | HS_EXISTS | HS_LOST);
                        is_mh_changed = true;
                        break;
                case HSMA_CANCEL:
-                       hsm_set_cl_event(&cl_flags, HE_CANCEL);
-                       CERROR("%s: Successful request "LPX64
-                              " on "DFID
-                              " cannot be a CANCEL\n",
+                       hsm_set_cl_event(&clf_flags, HE_CANCEL);
+                       CERROR("%s: Successful request %#llx on "DFID" cannot be a CANCEL\n",
                               mdt_obd_name(mdt),
                               pgs->hpk_cookie,
                               PFID(&pgs->hpk_fid));
                        break;
                default:
-                       CERROR("%s: Successful request "LPX64
-                              " on "DFID
-                              " %d is an unknown action\n",
+                       CERROR("%s: Successful request %#llx on "DFID" %d is an unknown action\n",
                               mdt_obd_name(mdt),
                               pgs->hpk_cookie, PFID(&pgs->hpk_fid),
                               car->car_hai->hai_action);
@@ -1383,29 +1566,28 @@ static int hsm_cdt_request_completed(struct mdt_thread_info *mti,
 
        /* rc != 0 means error when analysing action, it may come from
         * a crasy CT no need to manage DIRTY
+        * and if mdt_hsm_get_md_hsm() has returned an error, mh has not been
+        * filled
         */
-       if (rc == 0)
-               hsm_set_cl_flags(&cl_flags,
+       if (rc == 0 && !IS_ERR(obj))
+               hsm_set_cl_flags(&clf_flags,
                                 mh.mh_flags & HS_DIRTY ? CLF_HSM_DIRTY : 0);
 
        /* unlock is done later, after layout lock management */
-       if (is_mh_changed)
+       if (is_mh_changed && !IS_ERR(obj))
                rc = mdt_hsm_attr_set(mti, obj, &mh);
 
-unlock:
        /* we give back layout lock only if restore was successful or
-        * if restore was canceled or if policy is to not retry
+        * if no retry will be attempted and if object is still alive,
         * in other cases we just unlock the object */
-       if (car->car_hai->hai_action == HSMA_RESTORE &&
-           (pgs->hpk_errval == 0 || pgs->hpk_errval == ECANCELED ||
-            cdt->cdt_policy & CDT_NORETRY_ACTION)) {
-               struct cdt_restore_handle       *crh;
+       if (car->car_hai->hai_action == HSMA_RESTORE) {
+               struct mdt_lock_handle *lh;
 
                /* restore in data FID done, we swap the layouts
                 * only if restore is successful */
-               if (pgs->hpk_errval == 0) {
-                       rc = hsm_swap_layouts(mti, &car->car_hai->hai_fid,
-                                             &car->car_hai->hai_dfid, &mh);
+               if (pgs->hpk_errval == 0 && !IS_ERR(obj)) {
+                       rc = hsm_swap_layouts(mti, obj, &car->car_hai->hai_dfid,
+                                             &mh);
                        if (rc) {
                                if (cdt->cdt_policy & CDT_NORETRY_ACTION)
                                        *status = ARS_FAILED;
@@ -1416,30 +1598,33 @@ unlock:
                if (*status == ARS_WAITING)
                        GOTO(out, rc);
 
-               /* give back layout lock */
-               mutex_lock(&cdt->cdt_restore_lock);
-               crh = hsm_restore_hdl_find(cdt, &car->car_hai->hai_fid);
-               if (crh != NULL)
-                       list_del(&crh->crh_list);
-               mutex_unlock(&cdt->cdt_restore_lock);
-               /* just give back layout lock, we keep
-                * the reference which is given back
-                * later with the lock for HSM flags */
-               if (!IS_ERR(obj) && crh != NULL)
-                       mdt_object_unlock(mti, obj, &crh->crh_lh, 1);
-
-               if (crh != NULL)
-                       OBD_SLAB_FREE_PTR(crh, mdt_hsm_cdt_kmem);
+               /* restore special case, need to create ChangeLog record
+                * before to give back layout lock to avoid concurrent
+                * file updater to post out of order ChangeLog */
+               mo_changelog(env, CL_HSM, clf_flags, mdt->mdt_child,
+                            &car->car_hai->hai_fid);
+               need_changelog = false;
+
+               cdt_restore_handle_del(mti, cdt, &car->car_hai->hai_fid);
+               if (!IS_ERR_OR_NULL(obj)) {
+                       /* flush UPDATE lock so attributes are upadated */
+                       lh = &mti->mti_lh[MDT_LH_OLD];
+                       mdt_lock_reg_init(lh, LCK_EX);
+                       mdt_object_lock(mti, obj, lh, MDS_INODELOCK_UPDATE);
+                       mdt_object_unlock(mti, obj, lh, 1);
+               }
        }
 
        GOTO(out, rc);
 
 out:
-       if (obj != NULL && !IS_ERR(obj)) {
-               mo_changelog(env, CL_HSM, cl_flags,
-                            mdt_object_child(obj));
+       /* always add a ChangeLog record */
+       if (need_changelog)
+               mo_changelog(env, CL_HSM, clf_flags, mdt->mdt_child,
+                            &car->car_hai->hai_fid);
+
+       if (!IS_ERR(obj))
                mdt_object_put(mti->mti_env, obj);
-       }
 
        RETURN(rc);
 }
@@ -1448,13 +1633,11 @@ out:
  * update status of a request
  * \param mti [IN] context
  * \param pgs [IN] progress of the copy tool
- * \param update_record [IN] update llog record
  * \retval 0 success
  * \retval -ve failure
  */
 int mdt_hsm_update_request_state(struct mdt_thread_info *mti,
-                                struct hsm_progress_kernel *pgs,
-                                const int update_record)
+                                struct hsm_progress_kernel *pgs)
 {
        struct mdt_device       *mdt = mti->mti_mdt;
        struct coordinator      *cdt = &mdt->mdt_coordinator;
@@ -1469,16 +1652,15 @@ int mdt_hsm_update_request_state(struct mdt_thread_info *mti,
        /* first do sanity checks */
        car = mdt_cdt_update_request(cdt, pgs);
        if (IS_ERR(car)) {
-               CERROR("%s: Cannot find running request for cookie "LPX64
+               CERROR("%s: Cannot find running request for cookie %#llx"
                       " on fid="DFID"\n",
                       mdt_obd_name(mdt),
                       pgs->hpk_cookie, PFID(&pgs->hpk_fid));
-               if (car == NULL)
-                       RETURN(-ENOENT);
+
                RETURN(PTR_ERR(car));
        }
 
-       CDEBUG(D_HSM, "Progress received for fid="DFID" cookie="LPX64
+       CDEBUG(D_HSM, "Progress received for fid="DFID" cookie=%#llx"
                      " action=%s flags=%d err=%d fid="DFID" dfid="DFID"\n",
                      PFID(&pgs->hpk_fid), pgs->hpk_cookie,
                      hsm_copytool_action2name(car->car_hai->hai_action),
@@ -1497,7 +1679,7 @@ int mdt_hsm_update_request_state(struct mdt_thread_info *mti,
             car->car_hai->hai_action == HSMA_ARCHIVE) &&
            (!lu_fid_eq(&pgs->hpk_fid, &car->car_hai->hai_dfid) &&
             !lu_fid_eq(&pgs->hpk_fid, &car->car_hai->hai_fid))) {
-               CERROR("%s: Progress on "DFID" for cookie "LPX64
+               CERROR("%s: Progress on "DFID" for cookie %#llx"
                       " does not match request FID "DFID" nor data FID "
                       DFID"\n",
                       mdt_obd_name(mdt),
@@ -1508,7 +1690,7 @@ int mdt_hsm_update_request_state(struct mdt_thread_info *mti,
        }
 
        if (pgs->hpk_errval != 0 && !(pgs->hpk_flags & HP_FLAG_COMPLETED)) {
-               CERROR("%s: Progress on "DFID" for cookie "LPX64" action=%s"
+               CERROR("%s: Progress on "DFID" for cookie %#llx action=%s"
                       " is not coherent (err=%d and not completed"
                       " (flags=%d))\n",
                       mdt_obd_name(mdt),
@@ -1524,37 +1706,38 @@ int mdt_hsm_update_request_state(struct mdt_thread_info *mti,
        hsm_init_ucred(mdt_ucred(mti));
 
        if (pgs->hpk_flags & HP_FLAG_COMPLETED) {
-               enum agent_req_status    status;
+               enum agent_req_status status;
+               struct hsm_record_update update;
+               int rc1;
 
                rc = hsm_cdt_request_completed(mti, pgs, car, &status);
 
-               /* remove request from memory list */
-               mdt_cdt_remove_request(cdt, pgs->hpk_cookie);
-
-               CDEBUG(D_HSM, "Updating record: fid="DFID" cookie="LPX64
-                             " action=%s status=%s\n", PFID(&pgs->hpk_fid),
-                      pgs->hpk_cookie,
+               CDEBUG(D_HSM, "updating record: fid="DFID" cookie=%#llx action=%s "
+                             "status=%s\n",
+                      PFID(&pgs->hpk_fid), pgs->hpk_cookie,
                       hsm_copytool_action2name(car->car_hai->hai_action),
                       agent_req_status2name(status));
 
-               if (update_record) {
-                       int rc1;
-
-                       rc1 = mdt_agent_record_update(mti->mti_env, mdt,
-                                                    &pgs->hpk_cookie, 1,
-                                                    status);
-                       if (rc1)
-                               CERROR("%s: mdt_agent_record_update() failed,"
-                                      " rc=%d, cannot update status to %s"
-                                      " for cookie "LPX64"\n",
-                                      mdt_obd_name(mdt), rc1,
-                                      agent_req_status2name(status),
-                                      pgs->hpk_cookie);
-                       rc = (rc != 0 ? rc : rc1);
-               }
-               /* ct has completed a request, so a slot is available, wakeup
-                * cdt to find new work */
-               mdt_hsm_cdt_wakeup(mdt);
+               /* update record first (LU-9075) */
+               update.cookie = pgs->hpk_cookie;
+               update.status = status;
+
+               rc1 = mdt_agent_record_update(mti, &update, 1);
+               if (rc1)
+                       CERROR("%s: mdt_agent_record_update() failed,"
+                              " rc=%d, cannot update status to %s"
+                              " for cookie %#llx\n",
+                              mdt_obd_name(mdt), rc1,
+                              agent_req_status2name(status),
+                              pgs->hpk_cookie);
+               rc = (rc != 0 ? rc : rc1);
+
+               /* then remove request from memory list (LU-9075) */
+               mdt_cdt_remove_request(cdt, pgs->hpk_cookie);
+
+               /* ct has completed a request, so a slot is available,
+                * signal the coordinator to find new work */
+               mdt_hsm_cdt_event(cdt);
        } else {
                /* if copytool send a progress on a canceled request
                 * we inform copytool it should stop
@@ -1573,20 +1756,12 @@ out:
 
 
 /**
- * data passed to llog_cat_process() callback
- * to cancel requests
- */
-struct hsm_cancel_all_data {
-       struct mdt_device       *mdt;
-};
-
-/**
  *  llog_cat_process() callback, used to:
  *  - purge all requests
  * \param env [IN] environment
  * \param llh [IN] llog handle
  * \param hdr [IN] llog record
- * \param data [IN] cb data = struct hsm_cancel_all_data
+ * \param data [IN] cb data = struct mdt_thread_info
  * \retval 0 success
  * \retval -ve failure
  */
@@ -1594,21 +1769,30 @@ static int mdt_cancel_all_cb(const struct lu_env *env,
                             struct llog_handle *llh,
                             struct llog_rec_hdr *hdr, void *data)
 {
-       struct llog_agent_req_rec       *larr;
-       struct hsm_cancel_all_data      *hcad;
-       int                              rc = 0;
+       struct llog_agent_req_rec *larr = (struct llog_agent_req_rec *)hdr;
+       struct hsm_action_item *hai = &larr->arr_hai;
+       struct mdt_thread_info  *mti = data;
+       struct coordinator *cdt = &mti->mti_mdt->mdt_coordinator;
+       int rc;
        ENTRY;
 
-       larr = (struct llog_agent_req_rec *)hdr;
-       hcad = data;
-       if (larr->arr_status == ARS_WAITING ||
-           larr->arr_status == ARS_STARTED) {
-               larr->arr_status = ARS_CANCELED;
-               larr->arr_req_change = cfs_time_current_sec();
-               rc = mdt_agent_llog_update_rec(env, hcad->mdt, llh, larr);
-               if (rc == 0)
-                       RETURN(LLOG_DEL_RECORD);
+       if (larr->arr_status != ARS_WAITING &&
+           larr->arr_status != ARS_STARTED)
+               RETURN(0);
+
+       /* Unlock the EX layout lock */
+       if (hai->hai_action == HSMA_RESTORE)
+               cdt_restore_handle_del(mti, cdt, &hai->hai_fid);
+
+       larr->arr_status = ARS_CANCELED;
+       larr->arr_req_change = ktime_get_real_seconds();
+       rc = llog_write(env, llh, hdr, hdr->lrh_index);
+       if (rc < 0) {
+               CERROR("%s: cannot update agent log: rc = %d\n",
+                      mdt_obd_name(mti->mti_mdt), rc);
+               rc = LLOG_DEL_RECORD;
        }
+
        RETURN(rc);
 }
 
@@ -1618,26 +1802,48 @@ static int mdt_cancel_all_cb(const struct lu_env *env,
  */
 static int hsm_cancel_all_actions(struct mdt_device *mdt)
 {
+       struct lu_env                    env;
+       struct lu_context                session;
        struct mdt_thread_info          *mti;
        struct coordinator              *cdt = &mdt->mdt_coordinator;
        struct cdt_agent_req            *car;
        struct hsm_action_list          *hal = NULL;
        struct hsm_action_item          *hai;
-       struct hsm_cancel_all_data       hcad;
        int                              hal_sz = 0, hal_len, rc;
-       enum cdt_states                  save_state;
+       enum cdt_states                  old_state;
        ENTRY;
 
-       /* retrieve coordinator context */
-       mti = lu_context_key_get(&cdt->cdt_env.le_ctx, &mdt_thread_key);
+       rc = lu_env_init(&env, LCT_MD_THREAD);
+       if (rc < 0)
+               RETURN(rc);
+
+       /* for mdt_ucred(), lu_ucred stored in lu_ucred_key */
+       rc = lu_context_init(&session, LCT_SERVER_SESSION);
+       if (rc < 0)
+               GOTO(out_env, rc);
+
+       lu_context_enter(&session);
+       env.le_ses = &session;
+
+       mti = lu_context_key_get(&env.le_ctx, &mdt_thread_key);
+       LASSERT(mti != NULL);
+
+       mti->mti_env = &env;
+       mti->mti_mdt = mdt;
+
+       hsm_init_ucred(mdt_ucred(mti));
+
+       mutex_lock(&cdt->cdt_state_lock);
+       old_state = cdt->cdt_state;
 
        /* disable coordinator */
-       save_state = cdt->cdt_state;
-       cdt->cdt_state = CDT_DISABLE;
+       rc = set_cdt_state_locked(cdt, CDT_DISABLE);
+       if (rc)
+               GOTO(out_cdt_state_unlock, rc);
 
        /* send cancel to all running requests */
        down_read(&cdt->cdt_request_lock);
-       list_for_each_entry(car, &cdt->cdt_requests, car_request_list) {
+       list_for_each_entry(car, &cdt->cdt_request_list, car_request_list) {
                mdt_cdt_get_request(car);
                /* request is not yet removed from list, it will be done
                 * when copytool will return progress
@@ -1665,7 +1871,7 @@ static int hsm_cancel_all_actions(struct mdt_device *mdt)
                        if (hal == NULL) {
                                mdt_cdt_put_request(car);
                                up_read(&cdt->cdt_request_lock);
-                               GOTO(out, rc = -ENOMEM);
+                               GOTO(out_cdt_state, rc = -ENOMEM);
                        }
                }
 
@@ -1673,7 +1879,6 @@ static int hsm_cancel_all_actions(struct mdt_device *mdt)
                obd_uuid2fsname(hal->hal_fsname, mdt_obd_name(mdt),
                                MTI_NAME_MAXLEN);
                hal->hal_fsname[MTI_NAME_MAXLEN] = '\0';
-               hal->hal_compound_id = car->car_compound_id;
                hal->hal_archive_id = car->car_archive_id;
                hal->hal_flags = car->car_flags;
                hal->hal_count = 0;
@@ -1701,13 +1906,18 @@ static int hsm_cancel_all_actions(struct mdt_device *mdt)
                OBD_FREE(hal, hal_sz);
 
        /* cancel all on-disk records */
-       hcad.mdt = mdt;
-
-       rc = cdt_llog_process(mti->mti_env, mti->mti_mdt,
-                             mdt_cancel_all_cb, &hcad);
-out:
-       /* enable coordinator */
-       cdt->cdt_state = save_state;
+       rc = cdt_llog_process(mti->mti_env, mti->mti_mdt, mdt_cancel_all_cb,
+                             (void *)mti, 0, 0, WRITE);
+out_cdt_state:
+       /* Enable coordinator, unless the coordinator was stopping. */
+       set_cdt_state_locked(cdt, old_state);
+out_cdt_state_unlock:
+       mutex_unlock(&cdt->cdt_state_lock);
+
+       lu_context_exit(&session);
+       lu_context_fini(&session);
+out_env:
+       lu_env_fini(&env);
 
        RETURN(rc);
 }
@@ -1715,13 +1925,13 @@ out:
 /**
  * check if a request is compatible with file status
  * \param hai [IN] request description
- * \param hal_an [IN] request archive number (not used)
+ * \param archive_id [IN] request archive id
  * \param rq_flags [IN] request flags
  * \param hsm [IN] file HSM metadata
  * \retval boolean
  */
 bool mdt_hsm_is_action_compat(const struct hsm_action_item *hai,
-                             const int hal_an, const __u64 rq_flags,
+                             u32 archive_id, u64 rq_flags,
                              const struct md_hsm *hsm)
 {
        int      is_compat = false;
@@ -1734,6 +1944,12 @@ bool mdt_hsm_is_action_compat(const struct hsm_action_item *hai,
                if (!(hsm_flags & HS_NOARCHIVE) &&
                    (hsm_flags & HS_DIRTY || !(hsm_flags & HS_ARCHIVED)))
                        is_compat = true;
+
+               if (hsm_flags & HS_EXISTS &&
+                   archive_id != 0 &&
+                   archive_id != hsm->mh_arch_id)
+                       is_compat = false;
+
                break;
        case HSMA_RESTORE:
                if (!(hsm_flags & HS_DIRTY) && (hsm_flags & HS_RELEASED) &&
@@ -1749,8 +1965,8 @@ bool mdt_hsm_is_action_compat(const struct hsm_action_item *hai,
                is_compat = true;
                break;
        }
-       CDEBUG(D_HSM, "fid="DFID" action=%s flags="LPX64
-                     " extent="LPX64"-"LPX64" hsm_flags=%.8X %s\n",
+       CDEBUG(D_HSM, "fid="DFID" action=%s flags=%#llx"
+                     " extent=%#llx-%#llx hsm_flags=%.8X %s\n",
                      PFID(&hai->hai_fid),
                      hsm_copytool_action2name(hai->hai_action), rq_flags,
                      hai->hai_extent.offset, hai->hai_extent.length,
@@ -1761,7 +1977,7 @@ bool mdt_hsm_is_action_compat(const struct hsm_action_item *hai,
 }
 
 /*
- * /proc interface used to get/set HSM behaviour (cdt->cdt_policy)
+ * sysfs interface used to get/set HSM behaviour (cdt->cdt_policy)
  */
 static const struct {
        __u64            bit;
@@ -1805,7 +2021,7 @@ static void hsm_policy_bit2str(struct seq_file *m, const __u64 mask,
        ENTRY;
 
        if (hexa)
-               seq_printf(m, "("LPX64") ", mask);
+               seq_printf(m, "(%#llx) ", mask);
 
        for (i = 0; i < CDT_POLICY_SHIFT_COUNT; i++) {
                bit = (1ULL << i);
@@ -1821,7 +2037,7 @@ static void hsm_policy_bit2str(struct seq_file *m, const __u64 mask,
        }
        /* remove last ' ' */
        m->count--;
-       seq_putc(m, '\0');
+       seq_putc(m, '\n');
 }
 
 /* methods to read/write HSM policy flags */
@@ -1898,7 +2114,7 @@ mdt_hsm_policy_seq_write(struct file *file, const char __user *buffer,
 
        } while (start != NULL);
 
-       CDEBUG(D_HSM, "%s: new policy: rm="LPX64" add="LPX64" set="LPX64"\n",
+       CDEBUG(D_HSM, "%s: new policy: rm=%#llx add=%#llx set=%#llx\n",
               mdt_obd_name(mdt), remove_mask, add_mask, set_mask);
 
        /* if no sign in all string, it is a clear and set
@@ -1920,45 +2136,149 @@ out:
        OBD_FREE(buf, count + 1);
        RETURN(rc);
 }
-LPROC_SEQ_FOPS(mdt_hsm_policy);
-
-#define GENERATE_PROC_METHOD(VAR)                                      \
-static int mdt_hsm_##VAR##_seq_show(struct seq_file *m, void *data)    \
-{                                                                      \
-       struct mdt_device       *mdt = m->private;                      \
-       struct coordinator      *cdt = &mdt->mdt_coordinator;           \
-       ENTRY;                                                          \
-                                                                       \
-       seq_printf(m, LPU64"\n", (__u64)cdt->VAR);                      \
-       RETURN(0);                                                      \
-}                                                                      \
-static ssize_t                                                         \
-mdt_hsm_##VAR##_seq_write(struct file *file, const char __user *buffer,        \
-                         size_t count, loff_t *off)                    \
-                                                                       \
-{                                                                      \
-       struct seq_file         *m = file->private_data;                \
-       struct mdt_device       *mdt = m->private;                      \
-       struct coordinator      *cdt = &mdt->mdt_coordinator;           \
-       int                      val;                                   \
-       int                      rc;                                    \
-       ENTRY;                                                          \
-                                                                       \
-       rc = lprocfs_write_helper(buffer, count, &val);                 \
-       if (rc)                                                         \
-               RETURN(rc);                                             \
-       if (val > 0) {                                                  \
-               cdt->VAR = val;                                         \
-               RETURN(count);                                          \
-       }                                                               \
-       RETURN(-EINVAL);                                                \
-}                                                                      \
-
-GENERATE_PROC_METHOD(cdt_loop_period)
-GENERATE_PROC_METHOD(cdt_grace_delay)
-GENERATE_PROC_METHOD(cdt_active_req_timeout)
-GENERATE_PROC_METHOD(cdt_max_requests)
-GENERATE_PROC_METHOD(cdt_default_archive_id)
+LDEBUGFS_SEQ_FOPS(mdt_hsm_policy);
+
+ssize_t loop_period_show(struct kobject *kobj, struct attribute *attr,
+                        char *buf)
+{
+       struct coordinator *cdt = container_of(kobj, struct coordinator,
+                                              cdt_hsm_kobj);
+
+       return scnprintf(buf, PAGE_SIZE, "%u\n", cdt->cdt_loop_period);
+}
+
+ssize_t loop_period_store(struct kobject *kobj, struct attribute *attr,
+                         const char *buffer, size_t count)
+{
+       struct coordinator *cdt = container_of(kobj, struct coordinator,
+                                              cdt_hsm_kobj);
+       unsigned int val;
+       int rc;
+
+       rc = kstrtouint(buffer, 0, &val);
+       if (rc)
+               return rc;
+
+       if (val != 0)
+               cdt->cdt_loop_period = val;
+
+       return val ? count : -EINVAL;
+}
+LUSTRE_RW_ATTR(loop_period);
+
+ssize_t grace_delay_show(struct kobject *kobj, struct attribute *attr,
+                        char *buf)
+{
+       struct coordinator *cdt = container_of(kobj, struct coordinator,
+                                              cdt_hsm_kobj);
+
+       return scnprintf(buf, PAGE_SIZE, "%u\n", cdt->cdt_grace_delay);
+}
+
+ssize_t grace_delay_store(struct kobject *kobj, struct attribute *attr,
+                         const char *buffer, size_t count)
+{
+       struct coordinator *cdt = container_of(kobj, struct coordinator,
+                                              cdt_hsm_kobj);
+       unsigned int val;
+       int rc;
+
+       rc = kstrtouint(buffer, 0, &val);
+       if (rc)
+               return rc;
+
+       if (val != 0)
+               cdt->cdt_grace_delay = val;
+
+       return val ? count : -EINVAL;
+}
+LUSTRE_RW_ATTR(grace_delay);
+
+ssize_t active_request_timeout_show(struct kobject *kobj,
+                                   struct attribute *attr,
+                                   char *buf)
+{
+       struct coordinator *cdt = container_of(kobj, struct coordinator,
+                                              cdt_hsm_kobj);
+
+       return scnprintf(buf, PAGE_SIZE, "%d\n", cdt->cdt_active_req_timeout);
+}
+
+ssize_t active_request_timeout_store(struct kobject *kobj,
+                                    struct attribute *attr,
+                                    const char *buffer, size_t count)
+{
+       struct coordinator *cdt = container_of(kobj, struct coordinator,
+                                              cdt_hsm_kobj);
+       unsigned int val;
+       int rc;
+
+       rc = kstrtouint(buffer, 0, &val);
+       if (rc)
+               return rc;
+
+       if (val != 0)
+               cdt->cdt_active_req_timeout = val;
+
+       return val ? count : -EINVAL;
+}
+LUSTRE_RW_ATTR(active_request_timeout);
+
+ssize_t max_requests_show(struct kobject *kobj, struct attribute *attr,
+                         char *buf)
+{
+       struct coordinator *cdt = container_of(kobj, struct coordinator,
+                                              cdt_hsm_kobj);
+
+       return scnprintf(buf, PAGE_SIZE, "%llu\n", cdt->cdt_max_requests);
+}
+
+ssize_t max_requests_store(struct kobject *kobj, struct attribute *attr,
+                          const char *buffer, size_t count)
+{
+       struct coordinator *cdt = container_of(kobj, struct coordinator,
+                                              cdt_hsm_kobj);
+       unsigned long long val;
+       int rc;
+
+       rc = kstrtoull(buffer, 0, &val);
+       if (rc)
+               return rc;
+
+       if (val != 0)
+               cdt->cdt_max_requests = val;
+
+       return val ? count : -EINVAL;
+}
+LUSTRE_RW_ATTR(max_requests);
+
+ssize_t default_archive_id_show(struct kobject *kobj, struct attribute *attr,
+                               char *buf)
+{
+       struct coordinator *cdt = container_of(kobj, struct coordinator,
+                                              cdt_hsm_kobj);
+
+       return scnprintf(buf, PAGE_SIZE, "%u\n", cdt->cdt_default_archive_id);
+}
+
+ssize_t default_archive_id_store(struct kobject *kobj, struct attribute *attr,
+                                const char *buffer, size_t count)
+{
+       struct coordinator *cdt = container_of(kobj, struct coordinator,
+                                              cdt_hsm_kobj);
+       unsigned int val;
+       int rc;
+
+       rc = kstrtouint(buffer, 0, &val);
+       if (rc)
+               return rc;
+
+       if (val != 0)
+               cdt->cdt_default_archive_id = val;
+
+       return val ? count : -EINVAL;
+}
+LUSTRE_RW_ATTR(default_archive_id);
 
 /*
  * procfs write method for MDT/hsm_control
@@ -1971,57 +2291,55 @@ GENERATE_PROC_METHOD(cdt_default_archive_id)
 #define CDT_HELP_CMD     "help"
 #define CDT_MAX_CMD_LEN  10
 
-ssize_t
-mdt_hsm_cdt_control_seq_write(struct file *file, const char __user *buffer,
-                             size_t count, loff_t *off)
+ssize_t hsm_control_store(struct kobject *kobj, struct attribute *attr,
+                         const char *buffer, size_t count)
 {
-       struct seq_file         *m = file->private_data;
-       struct obd_device       *obd = m->private;
-       struct mdt_device       *mdt = mdt_dev(obd->obd_lu_dev);
-       struct coordinator      *cdt = &(mdt->mdt_coordinator);
-       int                      rc, usage = 0;
-       char                     kernbuf[CDT_MAX_CMD_LEN];
-       ENTRY;
-
-       if (count == 0 || count >= sizeof(kernbuf))
-               RETURN(-EINVAL);
-
-       if (copy_from_user(kernbuf, buffer, count))
-               RETURN(-EFAULT);
-       kernbuf[count] = 0;
+       struct obd_device *obd = container_of(kobj, struct obd_device,
+                                             obd_kset.kobj);
+       struct mdt_device *mdt = mdt_dev(obd->obd_lu_dev);
+       struct coordinator *cdt = &(mdt->mdt_coordinator);
+       int usage = 0;
+       int rc = 0;
 
-       if (kernbuf[count - 1] == '\n')
-               kernbuf[count - 1] = 0;
+       if (count == 0 || count >= CDT_MAX_CMD_LEN)
+               return -EINVAL;
 
-       rc = 0;
-       if (strcmp(kernbuf, CDT_ENABLE_CMD) == 0) {
+       if (strncmp(buffer, CDT_ENABLE_CMD, strlen(CDT_ENABLE_CMD)) == 0) {
                if (cdt->cdt_state == CDT_DISABLE) {
-                       cdt->cdt_state = CDT_RUNNING;
-                       mdt_hsm_cdt_wakeup(mdt);
+                       rc = set_cdt_state(cdt, CDT_RUNNING);
+                       mdt_hsm_cdt_event(cdt);
+                       wake_up(&cdt->cdt_waitq);
+               } else if (cdt->cdt_state == CDT_RUNNING) {
+                       rc = 0;
                } else {
                        rc = mdt_hsm_cdt_start(mdt);
                }
-       } else if (strcmp(kernbuf, CDT_STOP_CMD) == 0) {
-               if ((cdt->cdt_state == CDT_STOPPING) ||
-                   (cdt->cdt_state == CDT_STOPPED)) {
-                       CERROR("%s: Coordinator already stopped\n",
+       } else if (strncmp(buffer, CDT_STOP_CMD, strlen(CDT_STOP_CMD)) == 0) {
+               if (cdt->cdt_state == CDT_STOPPING) {
+                       CERROR("%s: Coordinator is already stopping\n",
                               mdt_obd_name(mdt));
                        rc = -EALREADY;
+               } else if (cdt->cdt_state == CDT_STOPPED) {
+                       rc = 0;
                } else {
-                       cdt->cdt_state = CDT_STOPPING;
+                       rc = mdt_hsm_cdt_stop(mdt);
                }
-       } else if (strcmp(kernbuf, CDT_DISABLE_CMD) == 0) {
+       } else if (strncmp(buffer, CDT_DISABLE_CMD,
+                          strlen(CDT_DISABLE_CMD)) == 0) {
                if ((cdt->cdt_state == CDT_STOPPING) ||
                    (cdt->cdt_state == CDT_STOPPED)) {
-                       CERROR("%s: Coordinator is stopped\n",
-                              mdt_obd_name(mdt));
-                       rc = -EINVAL;
+                       /* exit gracefully if coordinator is being stopped
+                        * or stopped already.
+                        */
+                       rc = 0;
                } else {
-                       cdt->cdt_state = CDT_DISABLE;
+                       rc = set_cdt_state(cdt, CDT_DISABLE);
                }
-       } else if (strcmp(kernbuf, CDT_PURGE_CMD) == 0) {
+       } else if (strncmp(buffer, CDT_PURGE_CMD,
+                          strlen(CDT_PURGE_CMD)) == 0) {
                rc = hsm_cancel_all_actions(mdt);
-       } else if (strcmp(kernbuf, CDT_HELP_CMD) == 0) {
+       } else if (strncmp(buffer, CDT_HELP_CMD,
+                          strlen(CDT_HELP_CMD)) == 0) {
                usage = 1;
        } else {
                usage = 1;
@@ -2040,28 +2358,17 @@ mdt_hsm_cdt_control_seq_write(struct file *file, const char __user *buffer,
        RETURN(count);
 }
 
-int mdt_hsm_cdt_control_seq_show(struct seq_file *m, void *data)
+ssize_t hsm_control_show(struct kobject *kobj, struct attribute *attr,
+                        char *buf)
 {
-       struct obd_device       *obd = m->private;
-       struct coordinator      *cdt;
-       ENTRY;
+       struct obd_device *obd = container_of(kobj, struct obd_device,
+                                             obd_kset.kobj);
+       struct coordinator *cdt;
 
        cdt = &(mdt_dev(obd->obd_lu_dev)->mdt_coordinator);
 
-       if (cdt->cdt_state == CDT_INIT)
-               seq_printf(m, "init\n");
-       else if (cdt->cdt_state == CDT_RUNNING)
-               seq_printf(m, "enabled\n");
-       else if (cdt->cdt_state == CDT_STOPPING)
-               seq_printf(m, "stopping\n");
-       else if (cdt->cdt_state == CDT_STOPPED)
-               seq_printf(m, "stopped\n");
-       else if (cdt->cdt_state == CDT_DISABLE)
-               seq_printf(m, "disabled\n");
-       else
-               seq_printf(m, "unknown\n");
-
-       RETURN(0);
+       return scnprintf(buf, PAGE_SIZE, "%s\n",
+                        cdt_mdt_state2str(cdt->cdt_state));
 }
 
 static int
@@ -2209,33 +2516,82 @@ mdt_hsm_other_request_mask_seq_write(struct file *file, const char __user *buf,
                                           &cdt->cdt_other_request_mask);
 }
 
-LPROC_SEQ_FOPS(mdt_hsm_cdt_loop_period);
-LPROC_SEQ_FOPS(mdt_hsm_cdt_grace_delay);
-LPROC_SEQ_FOPS(mdt_hsm_cdt_active_req_timeout);
-LPROC_SEQ_FOPS(mdt_hsm_cdt_max_requests);
-LPROC_SEQ_FOPS(mdt_hsm_cdt_default_archive_id);
-LPROC_SEQ_FOPS(mdt_hsm_user_request_mask);
-LPROC_SEQ_FOPS(mdt_hsm_group_request_mask);
-LPROC_SEQ_FOPS(mdt_hsm_other_request_mask);
+static ssize_t remove_archive_on_last_unlink_show(struct kobject *kobj,
+                                                 struct attribute *attr,
+                                                 char *buf)
+{
+       struct coordinator *cdt = container_of(kobj, struct coordinator,
+                                              cdt_hsm_kobj);
+
+       return scnprintf(buf, PAGE_SIZE, "%u\n",
+                        cdt->cdt_remove_archive_on_last_unlink);
+}
+
+static ssize_t remove_archive_on_last_unlink_store(struct kobject *kobj,
+                                                  struct attribute *attr,
+                                                  const char *buffer,
+                                                  size_t count)
+{
+       struct coordinator *cdt = container_of(kobj, struct coordinator,
+                                              cdt_hsm_kobj);
+       bool val;
+       int rc;
+
+       rc = kstrtobool(buffer, &val);
+       if (rc < 0)
+               return rc;
+
+       cdt->cdt_remove_archive_on_last_unlink = val;
+       return count;
+}
+LUSTRE_RW_ATTR(remove_archive_on_last_unlink);
+
+LDEBUGFS_SEQ_FOPS(mdt_hsm_user_request_mask);
+LDEBUGFS_SEQ_FOPS(mdt_hsm_group_request_mask);
+LDEBUGFS_SEQ_FOPS(mdt_hsm_other_request_mask);
+
+/* Read-only sysfs files for request counters */
+static ssize_t archive_count_show(struct kobject *kobj, struct attribute *attr,
+                                 char *buf)
+{
+       struct coordinator *cdt = container_of(kobj, struct coordinator,
+                                              cdt_hsm_kobj);
+
+       return scnprintf(buf, PAGE_SIZE, "%d\n",
+                        atomic_read(&cdt->cdt_archive_count));
+}
+LUSTRE_RO_ATTR(archive_count);
+
+static ssize_t restore_count_show(struct kobject *kobj, struct attribute *attr,
+                                 char *buf)
+{
+       struct coordinator *cdt = container_of(kobj, struct coordinator,
+                                              cdt_hsm_kobj);
+
+       return scnprintf(buf, PAGE_SIZE, "%d\n",
+                        atomic_read(&cdt->cdt_restore_count));
+}
+LUSTRE_RO_ATTR(restore_count);
+
+static ssize_t remove_count_show(struct kobject *kobj, struct attribute *attr,
+                                char *buf)
+{
+       struct coordinator *cdt = container_of(kobj, struct coordinator,
+                                              cdt_hsm_kobj);
+
+       return scnprintf(buf, PAGE_SIZE, "%d\n",
+                        atomic_read(&cdt->cdt_remove_count));
+}
+LUSTRE_RO_ATTR(remove_count);
 
-static struct lprocfs_vars lprocfs_mdt_hsm_vars[] = {
+static struct ldebugfs_vars ldebugfs_mdt_hsm_vars[] = {
        { .name =       "agents",
          .fops =       &mdt_hsm_agent_fops                     },
        { .name =       "actions",
          .fops =       &mdt_hsm_actions_fops,
          .proc_mode =  0444                                    },
-       { .name =       "default_archive_id",
-         .fops =       &mdt_hsm_cdt_default_archive_id_fops    },
-       { .name =       "grace_delay",
-         .fops =       &mdt_hsm_cdt_grace_delay_fops           },
-       { .name =       "loop_period",
-         .fops =       &mdt_hsm_cdt_loop_period_fops           },
-       { .name =       "max_requests",
-         .fops =       &mdt_hsm_cdt_max_requests_fops          },
        { .name =       "policy",
          .fops =       &mdt_hsm_policy_fops                    },
-       { .name =       "active_request_timeout",
-         .fops =       &mdt_hsm_cdt_active_req_timeout_fops    },
        { .name =       "active_requests",
          .fops =       &mdt_hsm_active_requests_fops           },
        { .name =       "user_request_mask",
@@ -2246,3 +2602,74 @@ static struct lprocfs_vars lprocfs_mdt_hsm_vars[] = {
          .fops =       &mdt_hsm_other_request_mask_fops,       },
        { 0 }
 };
+
+static struct attribute *hsm_attrs[] = {
+       &lustre_attr_loop_period.attr,
+       &lustre_attr_grace_delay.attr,
+       &lustre_attr_active_request_timeout.attr,
+       &lustre_attr_max_requests.attr,
+       &lustre_attr_default_archive_id.attr,
+       &lustre_attr_remove_archive_on_last_unlink.attr,
+       &lustre_attr_archive_count.attr,
+       &lustre_attr_restore_count.attr,
+       &lustre_attr_remove_count.attr,
+       NULL,
+};
+
+static void hsm_kobj_release(struct kobject *kobj)
+{
+       struct coordinator *cdt = container_of(kobj, struct coordinator,
+                                              cdt_hsm_kobj);
+
+       debugfs_remove_recursive(cdt->cdt_debugfs_dir);
+       cdt->cdt_debugfs_dir = NULL;
+
+       complete(&cdt->cdt_kobj_unregister);
+}
+
+static struct kobj_type hsm_ktype = {
+       .default_attrs  = hsm_attrs,
+       .sysfs_ops      = &lustre_sysfs_ops,
+       .release        = hsm_kobj_release,
+};
+
+/**
+ * create sysfs entries for coordinator
+ * \param mdt [IN]
+ * \retval 0 success
+ * \retval -ve failure
+ */
+int hsm_cdt_tunables_init(struct mdt_device *mdt)
+{
+       struct coordinator *cdt = &mdt->mdt_coordinator;
+       struct obd_device *obd = mdt2obd_dev(mdt);
+       int rc;
+
+       init_completion(&cdt->cdt_kobj_unregister);
+       rc = kobject_init_and_add(&cdt->cdt_hsm_kobj, &hsm_ktype,
+                                 &obd->obd_kset.kobj, "%s", "hsm");
+       if (rc) {
+               kobject_put(&cdt->cdt_hsm_kobj);
+               return rc;
+       }
+
+       /* init debugfs entries, failure is not critical */
+       cdt->cdt_debugfs_dir = debugfs_create_dir("hsm",
+                                                 obd->obd_debugfs_entry);
+       ldebugfs_add_vars(cdt->cdt_debugfs_dir, ldebugfs_mdt_hsm_vars, mdt);
+
+       return 0;
+}
+
+/**
+ * remove sysfs entries for coordinator
+ *
+ * @mdt
+ */
+void hsm_cdt_tunables_fini(struct mdt_device *mdt)
+{
+       struct coordinator *cdt = &mdt->mdt_coordinator;
+
+       kobject_put(&cdt->cdt_hsm_kobj);
+       wait_for_completion(&cdt->cdt_kobj_unregister);
+}