Whamcloud - gitweb
LU-7988 hsm: run HSM coordinator once per second at most
[fs/lustre-release.git] / lustre / mdt / mdt_coordinator.c
index c6253e8..bf04033 100644 (file)
@@ -23,7 +23,7 @@
  * Copyright (c) 2011, 2012 Commissariat a l'energie atomique et aux energies
  *                          alternatives
  *
- * Copyright (c) 2013, 2014, Intel Corporation.
+ * Copyright (c) 2013, 2016, Intel Corporation.
  * Use is subject to license terms.
  */
 /*
@@ -40,7 +40,6 @@
 
 #include <linux/kthread.h>
 #include <obd_support.h>
-#include <lustre_net.h>
 #include <lustre_export.h>
 #include <obd.h>
 #include <lprocfs_status.h>
@@ -145,6 +144,10 @@ struct hsm_scan_data {
        struct hsm_scan_request         *request;
 };
 
+struct hsm_thread_data {
+       struct mdt_thread_info  *cdt_mti;
+       struct hsm_scan_request *request;
+};
 /**
  *  llog_cat_process() callback, used to:
  *  - find waiting request and start action
@@ -270,6 +273,11 @@ static int mdt_coordinator_cb(const struct lu_env *env,
 
                request->hal_used_sz += cfs_size_round(hai->hai_len);
                request->hal->hal_count++;
+
+               if (hai->hai_action != HSMA_CANCEL)
+                       cdt_agent_record_hash_add(cdt, hai->hai_cookie,
+                                                 llh->lgh_hdr->llh_cat_idx,
+                                                 hdr->lrh_index);
                break;
        }
        case ARS_STARTED: {
@@ -282,9 +290,9 @@ static int mdt_coordinator_cb(const struct lu_env *env,
                 * error may happen if coordinator crashes or stopped
                 * with running request
                 */
-               car = mdt_cdt_find_request(cdt, larr->arr_hai.hai_cookie, NULL);
+               car = mdt_cdt_find_request(cdt, larr->arr_hai.hai_cookie);
                if (car == NULL) {
-                       last = larr->arr_req_create;
+                       last = larr->arr_req_change;
                } else {
                        last = car->car_req_update;
                        mdt_cdt_put_request(car);
@@ -325,6 +333,8 @@ static int mdt_coordinator_cb(const struct lu_env *env,
                         * to the client, for which an error will be
                         * sent back, leading to an endless cycle of
                         * cancellation. */
+                       cdt_agent_record_hash_del(cdt,
+                                                 larr->arr_hai.hai_cookie);
                        RETURN(LLOG_DEL_RECORD);
                }
 
@@ -344,8 +354,11 @@ static int mdt_coordinator_cb(const struct lu_env *env,
        case ARS_CANCELED:
        case ARS_SUCCEED:
                if ((larr->arr_req_change + cdt->cdt_grace_delay) <
-                   cfs_time_current_sec())
+                   cfs_time_current_sec()) {
+                       cdt_agent_record_hash_del(cdt,
+                                                 larr->arr_hai.hai_cookie);
                        RETURN(LLOG_DEL_RECORD);
+               }
                break;
        }
        RETURN(0);
@@ -382,11 +395,10 @@ int hsm_cdt_procfs_init(struct mdt_device *mdt)
  * remove /proc entries for coordinator
  * \param mdt [IN]
  */
-void  hsm_cdt_procfs_fini(struct mdt_device *mdt)
+void hsm_cdt_procfs_fini(struct mdt_device *mdt)
 {
-       struct coordinator      *cdt = &mdt->mdt_coordinator;
+       struct coordinator *cdt = &mdt->mdt_coordinator;
 
-       LASSERT(cdt->cdt_state == CDT_STOPPED);
        if (cdt->cdt_proc_dir != NULL)
                lprocfs_remove(&cdt->cdt_proc_dir);
 }
@@ -401,6 +413,94 @@ struct lprocfs_vars *hsm_cdt_get_proc_vars(void)
        return lprocfs_mdt_hsm_vars;
 }
 
+/* Release the ressource used by the coordinator. Called when the
+ * coordinator is stopping. */
+static void mdt_hsm_cdt_cleanup(struct mdt_device *mdt)
+{
+       struct coordinator              *cdt = &mdt->mdt_coordinator;
+       struct cdt_agent_req            *car, *tmp1;
+       struct hsm_agent                *ha, *tmp2;
+       struct cdt_restore_handle       *crh, *tmp3;
+       struct mdt_thread_info          *cdt_mti;
+
+       /* start cleaning */
+       down_write(&cdt->cdt_request_lock);
+       list_for_each_entry_safe(car, tmp1, &cdt->cdt_request_list,
+                                car_request_list) {
+               cfs_hash_del(cdt->cdt_request_cookie_hash,
+                            &car->car_hai->hai_cookie,
+                            &car->car_cookie_hash);
+               list_del(&car->car_request_list);
+               mdt_cdt_put_request(car);
+       }
+       up_write(&cdt->cdt_request_lock);
+
+       down_write(&cdt->cdt_agent_lock);
+       list_for_each_entry_safe(ha, tmp2, &cdt->cdt_agents, ha_list) {
+               list_del(&ha->ha_list);
+               OBD_FREE_PTR(ha);
+       }
+       up_write(&cdt->cdt_agent_lock);
+
+       cdt_mti = lu_context_key_get(&cdt->cdt_env.le_ctx, &mdt_thread_key);
+       mutex_lock(&cdt->cdt_restore_lock);
+       list_for_each_entry_safe(crh, tmp3, &cdt->cdt_restore_hdl, crh_list) {
+               list_del(&crh->crh_list);
+               /* give back layout lock */
+               mdt_object_unlock(cdt_mti, NULL, &crh->crh_lh, 1);
+               OBD_SLAB_FREE_PTR(crh, mdt_hsm_cdt_kmem);
+       }
+       mutex_unlock(&cdt->cdt_restore_lock);
+}
+
+/*
+ * Coordinator state transition table, indexed on enum cdt_states, taking
+ * from and to states. For instance since CDT_INIT to CDT_RUNNING is a
+ * valid transition, cdt_transition[CDT_INIT][CDT_RUNNING] is true.
+ */
+static bool cdt_transition[CDT_STATES_COUNT][CDT_STATES_COUNT] = {
+       /* from -> to:    stopped init   running disable stopping */
+       /* stopped */   { true,   true,  false,  false,  false },
+       /* init */      { true,   false, true,   false,  false },
+       /* running */   { false,  false, true,   true,   true },
+       /* disable */   { false,  false, true,   true,   true },
+       /* stopping */  { true,   false, false,  false,  false }
+};
+
+/**
+ * Change coordinator thread state
+ * Some combinations are not valid, so catch them here.
+ *
+ * Returns 0 on success, with old_state set if not NULL, or -EINVAL if
+ * the transition was not possible.
+ */
+static int set_cdt_state(struct coordinator *cdt, enum cdt_states new_state,
+                        enum cdt_states *old_state)
+{
+       int rc;
+       enum cdt_states state;
+
+       spin_lock(&cdt->cdt_state_lock);
+
+       state = cdt->cdt_state;
+
+       if (cdt_transition[state][new_state]) {
+               cdt->cdt_state = new_state;
+               spin_unlock(&cdt->cdt_state_lock);
+               if (old_state)
+                       *old_state = state;
+               rc = 0;
+       } else {
+               spin_unlock(&cdt->cdt_state_lock);
+               CDEBUG(D_HSM,
+                      "unexpected coordinator transition, from=%s, to=%s\n",
+                      cdt_mdt_state2str(state), cdt_mdt_state2str(new_state));
+               rc = -EINVAL;
+       }
+
+       return rc;
+}
+
 /**
  * coordinator thread
  * \param data [IN] obd device
@@ -409,83 +509,95 @@ struct lprocfs_vars *hsm_cdt_get_proc_vars(void)
  */
 static int mdt_coordinator(void *data)
 {
-       struct mdt_thread_info  *mti = data;
+       struct hsm_thread_data  *thread_data = data;
+       struct mdt_thread_info  *mti = thread_data->cdt_mti;
        struct mdt_device       *mdt = mti->mti_mdt;
        struct coordinator      *cdt = &mdt->mdt_coordinator;
        struct hsm_scan_data     hsd = { NULL };
+       time64_t                 wait_event_time = 1 * HZ;
+       time64_t                 last_housekeeping = 0;
        int                      rc = 0;
        int                      request_sz;
        ENTRY;
 
-       cdt->cdt_thread.t_flags = SVC_RUNNING;
-       wake_up(&cdt->cdt_thread.t_ctl_waitq);
+       /* set up hsd->request and max_requests */
+       hsd.max_requests = cdt->cdt_max_requests;
+       request_sz = hsd.max_requests * sizeof(*hsd.request);
+       hsd.request = thread_data->request;
 
        CDEBUG(D_HSM, "%s: coordinator thread starting, pid=%d\n",
               mdt_obd_name(mdt), current_pid());
 
-       /* we use a copy of cdt_max_requests in the cb, so if cdt_max_requests
-        * increases due to a change from /proc we do not overflow the
-        * hsd.request[] vector
-        */
-       hsd.max_requests = cdt->cdt_max_requests;
-       request_sz = hsd.max_requests * sizeof(*hsd.request);
-       OBD_ALLOC(hsd.request, request_sz);
-       if (!hsd.request)
-               GOTO(out, rc = -ENOMEM);
-
        hsd.mti = mti;
        obd_uuid2fsname(hsd.fs_name, mdt_obd_name(mdt), MTI_NAME_MAXLEN);
 
+       set_cdt_state(cdt, CDT_RUNNING, NULL);
+
+       /* Inform mdt_hsm_cdt_start(). */
+       wake_up_all(&cdt->cdt_waitq);
+
        while (1) {
-               struct l_wait_info lwi;
                int i;
 
-               lwi = LWI_TIMEOUT(cfs_time_seconds(cdt->cdt_loop_period),
-                                 NULL, NULL);
-               l_wait_event(cdt->cdt_thread.t_ctl_waitq,
-                            (cdt->cdt_thread.t_flags &
-                             (SVC_STOPPING|SVC_EVENT)),
-                            &lwi);
+               /* Limit execution of the expensive requests traversal
+                * to at most every "wait_event_time" jiffies. This prevents
+                * repeatedly locking/unlocking the catalog for each request
+                * and preventing other HSM operations from happening */
+               wait_event_interruptible_timeout(cdt->cdt_waitq,
+                                                kthread_should_stop(),
+                                                wait_event_time);
 
                CDEBUG(D_HSM, "coordinator resumes\n");
 
-               if (cdt->cdt_thread.t_flags & SVC_STOPPING ||
-                   cdt->cdt_state == CDT_STOPPING) {
-                       cdt->cdt_thread.t_flags &= ~SVC_STOPPING;
+               if (kthread_should_stop()) {
+                       CDEBUG(D_HSM, "Coordinator stops\n");
                        rc = 0;
                        break;
                }
 
-               /* wake up before timeout, new work arrives */
-               if (cdt->cdt_thread.t_flags & SVC_EVENT)
-                       cdt->cdt_thread.t_flags &= ~SVC_EVENT;
-
                /* if coordinator is suspended continue to wait */
                if (cdt->cdt_state == CDT_DISABLE) {
                        CDEBUG(D_HSM, "disable state, coordinator sleeps\n");
                        continue;
                }
 
+               /* If no event, and no housekeeping to do, continue to
+                * wait. */
+               if (last_housekeeping + cdt->cdt_loop_period <= get_seconds())
+                       last_housekeeping = get_seconds();
+               else if (!cdt->cdt_event)
+                       continue;
+
+               cdt->cdt_event = false;
+
                CDEBUG(D_HSM, "coordinator starts reading llog\n");
 
                if (hsd.max_requests != cdt->cdt_max_requests) {
                        /* cdt_max_requests has changed,
                         * we need to allocate a new buffer
                         */
-                       OBD_FREE(hsd.request, request_sz);
-                       hsd.max_requests = cdt->cdt_max_requests;
-                       request_sz = hsd.max_requests * sizeof(*hsd.request);
-                       OBD_ALLOC(hsd.request, request_sz);
-                       if (!hsd.request) {
-                               rc = -ENOMEM;
-                               break;
+                       struct hsm_scan_request *tmp = NULL;
+                       int max_requests = cdt->cdt_max_requests;
+                       OBD_ALLOC_LARGE(tmp, max_requests *
+                                       sizeof(struct hsm_scan_request));
+                       if (!tmp) {
+                               CERROR("Failed to resize request buffer, "
+                                      "keeping it at %d\n",
+                                      hsd.max_requests);
+                               cdt->cdt_max_requests = hsd.max_requests;
+                       } else {
+                               OBD_FREE_LARGE(hsd.request, request_sz);
+                               hsd.max_requests = max_requests;
+                               request_sz = hsd.max_requests *
+                                       sizeof(struct hsm_scan_request);
+                               hsd.request = tmp;
                        }
                }
 
                hsd.request_cnt = 0;
 
-               rc = cdt_llog_process(mti->mti_env, mdt,
-                                     mdt_coordinator_cb, &hsd);
+               rc = cdt_llog_process(mti->mti_env, mdt, mdt_coordinator_cb,
+                                     &hsd, 0, 0, WRITE);
                if (rc < 0)
                        goto clean_cb_alloc;
 
@@ -552,24 +664,11 @@ clean_cb_alloc:
                        OBD_FREE(request->hal, request->hal_sz);
                }
        }
-       EXIT;
-out:
+
        if (hsd.request)
-               OBD_FREE(hsd.request, request_sz);
+               OBD_FREE_LARGE(hsd.request, request_sz);
 
-       if (cdt->cdt_state == CDT_STOPPING) {
-               /* request comes from /proc path, so we need to clean cdt
-                * struct */
-                mdt_hsm_cdt_stop(mdt);
-                mdt->mdt_opts.mo_coordinator = 0;
-       } else {
-               /* request comes from a thread event, generated
-                * by mdt_stop_coordinator(), we have to ack
-                * and cdt cleaning will be done by event sender
-                */
-               cdt->cdt_thread.t_flags = SVC_STOPPED;
-               wake_up(&cdt->cdt_thread.t_ctl_waitq);
-       }
+       mdt_hsm_cdt_cleanup(mdt);
 
        if (rc != 0)
                CERROR("%s: coordinator thread exiting, process=%d, rc=%d\n",
@@ -579,7 +678,7 @@ out:
                              " no error\n",
                       mdt_obd_name(mdt), current_pid());
 
-       return rc;
+       RETURN(rc);
 }
 
 /**
@@ -706,8 +805,8 @@ static int mdt_hsm_pending_restore(struct mdt_thread_info *mti)
 
        hrd.hrd_mti = mti;
 
-       rc = cdt_llog_process(mti->mti_env, mti->mti_mdt,
-                             hsm_restore_cb, &hrd);
+       rc = cdt_llog_process(mti->mti_env, mti->mti_mdt, hsm_restore_cb, &hrd,
+                             0, 0, WRITE);
 
        RETURN(rc);
 }
@@ -736,27 +835,6 @@ static int hsm_init_ucred(struct lu_ucred *uc)
 }
 
 /**
- * wake up coordinator thread
- * \param mdt [IN] device
- * \retval 0 success
- * \retval -ve failure
- */
-int mdt_hsm_cdt_wakeup(struct mdt_device *mdt)
-{
-       struct coordinator      *cdt = &mdt->mdt_coordinator;
-       ENTRY;
-
-       if (cdt->cdt_state == CDT_STOPPED)
-               RETURN(-ESRCH);
-
-       /* wake up coordinator */
-       cdt->cdt_thread.t_flags = SVC_EVENT;
-       wake_up(&cdt->cdt_thread.t_ctl_waitq);
-
-       RETURN(0);
-}
-
-/**
  * initialize coordinator struct
  * \param mdt [IN] device
  * \retval 0 success
@@ -769,31 +847,53 @@ int mdt_hsm_cdt_init(struct mdt_device *mdt)
        int                      rc;
        ENTRY;
 
-       cdt->cdt_state = CDT_STOPPED;
-
-       init_waitqueue_head(&cdt->cdt_thread.t_ctl_waitq);
-       mutex_init(&cdt->cdt_llog_lock);
+       init_waitqueue_head(&cdt->cdt_waitq);
+       init_rwsem(&cdt->cdt_llog_lock);
        init_rwsem(&cdt->cdt_agent_lock);
        init_rwsem(&cdt->cdt_request_lock);
        mutex_init(&cdt->cdt_restore_lock);
+       spin_lock_init(&cdt->cdt_state_lock);
+       set_cdt_state(cdt, CDT_STOPPED, NULL);
 
-       INIT_LIST_HEAD(&cdt->cdt_requests);
+       INIT_LIST_HEAD(&cdt->cdt_request_list);
        INIT_LIST_HEAD(&cdt->cdt_agents);
        INIT_LIST_HEAD(&cdt->cdt_restore_hdl);
 
+       cdt->cdt_request_cookie_hash = cfs_hash_create("REQUEST_COOKIE_HASH",
+                                                      CFS_HASH_BITS_MIN,
+                                                      CFS_HASH_BITS_MAX,
+                                                      CFS_HASH_BKT_BITS,
+                                                      0 /* extra bytes */,
+                                                      CFS_HASH_MIN_THETA,
+                                                      CFS_HASH_MAX_THETA,
+                                               &cdt_request_cookie_hash_ops,
+                                                      CFS_HASH_DEFAULT);
+       if (cdt->cdt_request_cookie_hash == NULL)
+               RETURN(-ENOMEM);
+
+       cdt->cdt_agent_record_hash = cfs_hash_create("AGENT_RECORD_HASH",
+                                                    CFS_HASH_BITS_MIN,
+                                                    CFS_HASH_BITS_MAX,
+                                                    CFS_HASH_BKT_BITS,
+                                                    0 /* extra bytes */,
+                                                    CFS_HASH_MIN_THETA,
+                                                    CFS_HASH_MAX_THETA,
+                                                    &cdt_agent_record_hash_ops,
+                                                    CFS_HASH_DEFAULT);
+       if (cdt->cdt_agent_record_hash == NULL)
+               GOTO(out_request_cookie_hash, rc = -ENOMEM);
+
        rc = lu_env_init(&cdt->cdt_env, LCT_MD_THREAD);
        if (rc < 0)
-               RETURN(rc);
+               GOTO(out_agent_record_hash, rc);
 
        /* for mdt_ucred(), lu_ucred stored in lu_ucred_key */
        rc = lu_context_init(&cdt->cdt_session, LCT_SERVER_SESSION);
-       if (rc == 0) {
-               lu_context_enter(&cdt->cdt_session);
-               cdt->cdt_env.le_ses = &cdt->cdt_session;
-       } else {
-               lu_env_fini(&cdt->cdt_env);
-               RETURN(rc);
-       }
+       if (rc < 0)
+               GOTO(out_env, rc);
+
+       lu_context_enter(&cdt->cdt_session);
+       cdt->cdt_env.le_ses = &cdt->cdt_session;
 
        cdt_mti = lu_context_key_get(&cdt->cdt_env.le_ctx, &mdt_thread_key);
        LASSERT(cdt_mti != NULL);
@@ -812,7 +912,25 @@ int mdt_hsm_cdt_init(struct mdt_device *mdt)
        cdt->cdt_policy = CDT_DEFAULT_POLICY;
        cdt->cdt_active_req_timeout = 3600;
 
+       /* Initialize cdt_compound_id here to allow its usage for
+        * delayed requests from RAoLU policy */
+       atomic_set(&cdt->cdt_compound_id, cfs_time_current_sec());
+
+       /* by default do not remove archives on last unlink */
+       cdt->cdt_remove_archive_on_last_unlink = false;
+
        RETURN(0);
+
+out_env:
+       lu_env_fini(&cdt->cdt_env);
+out_agent_record_hash:
+       cfs_hash_putref(cdt->cdt_agent_record_hash);
+       cdt->cdt_agent_record_hash = NULL;
+out_request_cookie_hash:
+       cfs_hash_putref(cdt->cdt_request_cookie_hash);
+       cdt->cdt_request_cookie_hash = NULL;
+
+       return rc;
 }
 
 /**
@@ -829,6 +947,12 @@ int  mdt_hsm_cdt_fini(struct mdt_device *mdt)
 
        lu_env_fini(&cdt->cdt_env);
 
+       cfs_hash_putref(cdt->cdt_agent_record_hash);
+       cdt->cdt_agent_record_hash = NULL;
+
+       cfs_hash_putref(cdt->cdt_request_cookie_hash);
+       cdt->cdt_request_cookie_hash = NULL;
+
        RETURN(0);
 }
 
@@ -843,8 +967,9 @@ static int mdt_hsm_cdt_start(struct mdt_device *mdt)
        struct coordinator      *cdt = &mdt->mdt_coordinator;
        int                      rc;
        void                    *ptr;
-       struct mdt_thread_info  *cdt_mti;
        struct task_struct      *task;
+       int                      request_sz;
+       struct hsm_thread_data   thread_data;
        ENTRY;
 
        /* functions defined but not yet used
@@ -852,8 +977,9 @@ static int mdt_hsm_cdt_start(struct mdt_device *mdt)
         */
        ptr = dump_requests;
 
-       if (cdt->cdt_state != CDT_STOPPED) {
-               CERROR("%s: Coordinator already started\n",
+       rc = set_cdt_state(cdt, CDT_INIT, NULL);
+       if (rc) {
+               CERROR("%s: Coordinator already started or stopping\n",
                       mdt_obd_name(mdt));
                RETURN(-EALREADY);
        }
@@ -861,9 +987,6 @@ static int mdt_hsm_cdt_start(struct mdt_device *mdt)
        CLASSERT(1 << (CDT_POLICY_SHIFT_COUNT - 1) == CDT_POLICY_LAST);
        cdt->cdt_policy = CDT_DEFAULT_POLICY;
 
-       cdt->cdt_state = CDT_INIT;
-
-       atomic_set(&cdt->cdt_compound_id, cfs_time_current_sec());
        /* just need to be larger than previous one */
        /* cdt_last_cookie is protected by cdt_llog_lock */
        cdt->cdt_last_cookie = cfs_time_current_sec();
@@ -876,32 +999,42 @@ static int mdt_hsm_cdt_start(struct mdt_device *mdt)
         * /proc entries are created by the coordinator thread */
 
        /* set up list of started restore requests */
-       cdt_mti = lu_context_key_get(&cdt->cdt_env.le_ctx, &mdt_thread_key);
-       rc = mdt_hsm_pending_restore(cdt_mti);
+       thread_data.cdt_mti =
+               lu_context_key_get(&cdt->cdt_env.le_ctx, &mdt_thread_key);
+       rc = mdt_hsm_pending_restore(thread_data.cdt_mti);
        if (rc)
                CERROR("%s: cannot take the layout locks needed"
                       " for registered restore: %d\n",
                       mdt_obd_name(mdt), rc);
 
-       task = kthread_run(mdt_coordinator, cdt_mti, "hsm_cdtr");
+       if (mdt->mdt_bottom->dd_rdonly)
+               RETURN(0);
+
+       /* Allocate the initial hsd.request[] vector*/
+       request_sz = cdt->cdt_max_requests * sizeof(struct hsm_scan_request);
+       OBD_ALLOC_LARGE(thread_data.request, request_sz);
+       if (!thread_data.request) {
+               set_cdt_state(cdt, CDT_STOPPED, NULL);
+               RETURN(-ENOMEM);
+       }
+
+       task = kthread_run(mdt_coordinator, &thread_data, "hsm_cdtr");
        if (IS_ERR(task)) {
                rc = PTR_ERR(task);
-               cdt->cdt_state = CDT_STOPPED;
+               set_cdt_state(cdt, CDT_STOPPED, NULL);
+               OBD_FREE(thread_data.request, request_sz);
                CERROR("%s: error starting coordinator thread: %d\n",
                       mdt_obd_name(mdt), rc);
-               RETURN(rc);
        } else {
+               cdt->cdt_task = task;
+               wait_event(cdt->cdt_waitq,
+                          cdt->cdt_state != CDT_INIT);
                CDEBUG(D_HSM, "%s: coordinator thread started\n",
                       mdt_obd_name(mdt));
                rc = 0;
        }
 
-       wait_event(cdt->cdt_thread.t_ctl_waitq,
-                      (cdt->cdt_thread.t_flags & SVC_RUNNING));
-
-       cdt->cdt_state = CDT_RUNNING;
-       mdt->mdt_opts.mo_coordinator = 1;
-       RETURN(0);
+       RETURN(rc);
 }
 
 /**
@@ -910,63 +1043,45 @@ static int mdt_hsm_cdt_start(struct mdt_device *mdt)
  */
 int mdt_hsm_cdt_stop(struct mdt_device *mdt)
 {
-       struct coordinator              *cdt = &mdt->mdt_coordinator;
-       struct cdt_agent_req            *car, *tmp1;
-       struct hsm_agent                *ha, *tmp2;
-       struct cdt_restore_handle       *crh, *tmp3;
-       struct mdt_thread_info          *cdt_mti;
-       ENTRY;
-
-       if (cdt->cdt_state == CDT_STOPPED) {
-               CERROR("%s: Coordinator already stopped\n",
-                      mdt_obd_name(mdt));
-               RETURN(-EALREADY);
-       }
-
-       if (cdt->cdt_state != CDT_STOPPING) {
-               /* stop coordinator thread before cleaning */
-               cdt->cdt_thread.t_flags = SVC_STOPPING;
-               wake_up(&cdt->cdt_thread.t_ctl_waitq);
-               wait_event(cdt->cdt_thread.t_ctl_waitq,
-                          cdt->cdt_thread.t_flags & SVC_STOPPED);
-       }
-       cdt->cdt_state = CDT_STOPPED;
-
-       /* start cleaning */
-       down_write(&cdt->cdt_request_lock);
-       list_for_each_entry_safe(car, tmp1, &cdt->cdt_requests,
-                                car_request_list) {
-               list_del(&car->car_request_list);
-               mdt_cdt_free_request(car);
-       }
-       up_write(&cdt->cdt_request_lock);
+       struct coordinator *cdt = &mdt->mdt_coordinator;
+       int rc;
 
-       down_write(&cdt->cdt_agent_lock);
-       list_for_each_entry_safe(ha, tmp2, &cdt->cdt_agents, ha_list) {
-               list_del(&ha->ha_list);
-               OBD_FREE_PTR(ha);
+       ENTRY;
+       /* stop coordinator thread */
+       rc = set_cdt_state(cdt, CDT_STOPPING, NULL);
+       if (rc == 0) {
+               kthread_stop(cdt->cdt_task);
+               cdt->cdt_task = NULL;
+               set_cdt_state(cdt, CDT_STOPPED, NULL);
        }
-       up_write(&cdt->cdt_agent_lock);
 
-       cdt_mti = lu_context_key_get(&cdt->cdt_env.le_ctx, &mdt_thread_key);
-       mutex_lock(&cdt->cdt_restore_lock);
-       list_for_each_entry_safe(crh, tmp3, &cdt->cdt_restore_hdl, crh_list) {
-               struct mdt_object       *child;
+       RETURN(rc);
+}
 
-               /* give back layout lock */
-               child = mdt_object_find(&cdt->cdt_env, mdt, &crh->crh_fid);
-               if (!IS_ERR(child))
-                       mdt_object_unlock_put(cdt_mti, child, &crh->crh_lh, 1);
+static int mdt_hsm_set_exists(struct mdt_thread_info *mti,
+                             const struct lu_fid *fid,
+                             u32 archive_id)
+{
+       struct mdt_object *obj;
+       struct md_hsm mh;
+       int rc;
 
-               list_del(&crh->crh_list);
+       obj = mdt_hsm_get_md_hsm(mti, fid, &mh);
+       if (IS_ERR(obj))
+               GOTO(out, rc = PTR_ERR(obj));
 
-               OBD_SLAB_FREE_PTR(crh, mdt_hsm_cdt_kmem);
-       }
-       mutex_unlock(&cdt->cdt_restore_lock);
+       if (mh.mh_flags & HS_EXISTS &&
+           mh.mh_arch_id == archive_id)
+               GOTO(out_obj, rc = 0);
 
-       mdt->mdt_opts.mo_coordinator = 0;
+       mh.mh_flags |= HS_EXISTS;
+       mh.mh_arch_id = archive_id;
+       rc = mdt_hsm_attr_set(mti, obj, &mh);
 
-       RETURN(0);
+out_obj:
+       mdt_object_put(mti->mti_env, obj);
+out:
+       return rc;
 }
 
 /**
@@ -1012,7 +1127,7 @@ int mdt_hsm_add_hal(struct mdt_thread_info *mti,
                        }
 
                        /* find the running request to set it canceled */
-                       car = mdt_cdt_find_request(cdt, hai->hai_cookie, NULL);
+                       car = mdt_cdt_find_request(cdt, hai->hai_cookie);
                        if (car != NULL) {
                                car->car_canceled = 1;
                                /* uuid has to be changed to the one running the
@@ -1028,20 +1143,11 @@ int mdt_hsm_add_hal(struct mdt_thread_info *mti,
                }
 
                if (hai->hai_action == HSMA_ARCHIVE) {
-                       struct mdt_object *obj;
-                       struct md_hsm hsm;
-
-                       obj = mdt_hsm_get_md_hsm(mti, &hai->hai_fid, &hsm);
-                       if (IS_ERR(obj) && (PTR_ERR(obj) == -ENOENT))
+                       rc = mdt_hsm_set_exists(mti, &hai->hai_fid,
+                                               hal->hal_archive_id);
+                       if (rc == -ENOENT)
                                continue;
-                       if (IS_ERR(obj))
-                               GOTO(out, rc = PTR_ERR(obj));
-
-                       hsm.mh_flags |= HS_EXISTS;
-                       hsm.mh_arch_id = hal->hal_archive_id;
-                       rc = mdt_hsm_attr_set(mti, obj, &hsm);
-                       mdt_object_put(mti->mti_env, obj);
-                       if (rc)
+                       else if (rc < 0)
                                GOTO(out, rc);
                }
 
@@ -1136,20 +1242,17 @@ static int hsm_cdt_request_completed(struct mdt_thread_info *mti,
        int                      cl_flags = 0, rc = 0;
        struct md_hsm            mh;
        bool                     is_mh_changed;
+       bool                     need_changelog = true;
        ENTRY;
 
        /* default is to retry */
        *status = ARS_WAITING;
 
-       /* find object by FID */
+       /* find object by FID, mdt_hsm_get_md_hsm() returns obj or err
+        * if error/removed continue anyway to get correct reporting done */
        obj = mdt_hsm_get_md_hsm(mti, &car->car_hai->hai_fid, &mh);
        /* we will update MD HSM only if needed */
        is_mh_changed = false;
-       if (IS_ERR(obj)) {
-               /* object removed */
-               *status = ARS_SUCCEED;
-               goto unlock;
-       }
 
        /* no need to change mh->mh_arch_id
         * mdt_hsm_get_md_hsm() got it from disk and it is still valid
@@ -1177,9 +1280,11 @@ static int hsm_cdt_request_completed(struct mdt_thread_info *mti,
                        *status = ARS_SUCCEED;
                        break;
                default:
+                       /* retry only if current policy or requested, and
+                        * object is not on error/removed */
                        *status = (cdt->cdt_policy & CDT_NORETRY_ACTION ||
-                                  !(pgs->hpk_flags & HP_FLAG_RETRY) ?
-                                  ARS_FAILED : ARS_WAITING);
+                                  !(pgs->hpk_flags & HP_FLAG_RETRY) ||
+                                  IS_ERR(obj)) ? ARS_FAILED : ARS_WAITING;
                        break;
                }
 
@@ -1278,21 +1383,18 @@ static int hsm_cdt_request_completed(struct mdt_thread_info *mti,
                                 mh.mh_flags & HS_DIRTY ? CLF_HSM_DIRTY : 0);
 
        /* unlock is done later, after layout lock management */
-       if (is_mh_changed)
+       if (is_mh_changed && !IS_ERR(obj))
                rc = mdt_hsm_attr_set(mti, obj, &mh);
 
-unlock:
        /* we give back layout lock only if restore was successful or
-        * if restore was canceled or if policy is to not retry
+        * if no retry will be attempted and if object is still alive,
         * in other cases we just unlock the object */
-       if (car->car_hai->hai_action == HSMA_RESTORE &&
-           (pgs->hpk_errval == 0 || pgs->hpk_errval == ECANCELED ||
-            cdt->cdt_policy & CDT_NORETRY_ACTION)) {
+       if (car->car_hai->hai_action == HSMA_RESTORE) {
                struct cdt_restore_handle       *crh;
 
                /* restore in data FID done, we swap the layouts
                 * only if restore is successful */
-               if (pgs->hpk_errval == 0 && !IS_ERR_OR_NULL(obj)) {
+               if (pgs->hpk_errval == 0 && !IS_ERR(obj)) {
                        rc = hsm_swap_layouts(mti, obj, &car->car_hai->hai_dfid,
                                              &mh);
                        if (rc) {
@@ -1305,6 +1407,13 @@ unlock:
                if (*status == ARS_WAITING)
                        GOTO(out, rc);
 
+               /* restore special case, need to create ChangeLog record
+                * before to give back layout lock to avoid concurrent
+                * file updater to post out of order ChangeLog */
+               mo_changelog(env, CL_HSM, cl_flags, mdt->mdt_child,
+                            &car->car_hai->hai_fid);
+               need_changelog = false;
+
                /* give back layout lock */
                mutex_lock(&cdt->cdt_restore_lock);
                crh = mdt_hsm_restore_hdl_find(cdt, &car->car_hai->hai_fid);
@@ -1325,11 +1434,13 @@ unlock:
        GOTO(out, rc);
 
 out:
-       if (obj != NULL && !IS_ERR(obj)) {
-               mo_changelog(env, CL_HSM, cl_flags,
-                            mdt_object_child(obj));
+       /* always add a ChangeLog record */
+       if (need_changelog)
+               mo_changelog(env, CL_HSM, cl_flags, mdt->mdt_child,
+                            &car->car_hai->hai_fid);
+
+       if (!IS_ERR(obj))
                mdt_object_put(mti->mti_env, obj);
-       }
 
        RETURN(rc);
 }
@@ -1417,15 +1528,14 @@ int mdt_hsm_update_request_state(struct mdt_thread_info *mti,
 
                rc = hsm_cdt_request_completed(mti, pgs, car, &status);
 
-               /* remove request from memory list */
-               mdt_cdt_remove_request(cdt, pgs->hpk_cookie);
-
-               CDEBUG(D_HSM, "Updating record: fid="DFID" cookie=%#llx"
-                             " action=%s status=%s\n", PFID(&pgs->hpk_fid),
-                      pgs->hpk_cookie,
+               CDEBUG(D_HSM, "%s record: fid="DFID" cookie=%#llx action=%s "
+                             "status=%s\n",
+                      update_record ? "Updating" : "Not updating",
+                      PFID(&pgs->hpk_fid), pgs->hpk_cookie,
                       hsm_copytool_action2name(car->car_hai->hai_action),
                       agent_req_status2name(status));
 
+               /* update record first (LU-9075) */
                if (update_record) {
                        int rc1;
 
@@ -1441,9 +1551,13 @@ int mdt_hsm_update_request_state(struct mdt_thread_info *mti,
                                       pgs->hpk_cookie);
                        rc = (rc != 0 ? rc : rc1);
                }
-               /* ct has completed a request, so a slot is available, wakeup
-                * cdt to find new work */
-               mdt_hsm_cdt_wakeup(mdt);
+
+               /* then remove request from memory list (LU-9075) */
+               mdt_cdt_remove_request(cdt, pgs->hpk_cookie);
+
+               /* ct has completed a request, so a slot is available,
+                * signal the coordinator to find new work */
+               mdt_hsm_cdt_event(cdt);
        } else {
                /* if copytool send a progress on a canceled request
                 * we inform copytool it should stop
@@ -1506,6 +1620,8 @@ static int mdt_cancel_all_cb(const struct lu_env *env,
  */
 static int hsm_cancel_all_actions(struct mdt_device *mdt)
 {
+       struct lu_env                    env;
+       struct lu_context                session;
        struct mdt_thread_info          *mti;
        struct coordinator              *cdt = &mdt->mdt_coordinator;
        struct cdt_agent_req            *car;
@@ -1513,19 +1629,37 @@ static int hsm_cancel_all_actions(struct mdt_device *mdt)
        struct hsm_action_item          *hai;
        struct hsm_cancel_all_data       hcad;
        int                              hal_sz = 0, hal_len, rc;
-       enum cdt_states                  save_state;
+       enum cdt_states                  old_state;
        ENTRY;
 
-       /* retrieve coordinator context */
-       mti = lu_context_key_get(&cdt->cdt_env.le_ctx, &mdt_thread_key);
+       rc = lu_env_init(&env, LCT_MD_THREAD);
+       if (rc < 0)
+               RETURN(rc);
+
+       /* for mdt_ucred(), lu_ucred stored in lu_ucred_key */
+       rc = lu_context_init(&session, LCT_SERVER_SESSION);
+       if (rc < 0)
+               GOTO(out_env, rc);
+
+       lu_context_enter(&session);
+       env.le_ses = &session;
+
+       mti = lu_context_key_get(&env.le_ctx, &mdt_thread_key);
+       LASSERT(mti != NULL);
+
+       mti->mti_env = &env;
+       mti->mti_mdt = mdt;
+
+       hsm_init_ucred(mdt_ucred(mti));
 
        /* disable coordinator */
-       save_state = cdt->cdt_state;
-       cdt->cdt_state = CDT_DISABLE;
+       rc = set_cdt_state(cdt, CDT_DISABLE, &old_state);
+       if (rc)
+               RETURN(rc);
 
        /* send cancel to all running requests */
        down_read(&cdt->cdt_request_lock);
-       list_for_each_entry(car, &cdt->cdt_requests, car_request_list) {
+       list_for_each_entry(car, &cdt->cdt_request_list, car_request_list) {
                mdt_cdt_get_request(car);
                /* request is not yet removed from list, it will be done
                 * when copytool will return progress
@@ -1553,7 +1687,7 @@ static int hsm_cancel_all_actions(struct mdt_device *mdt)
                        if (hal == NULL) {
                                mdt_cdt_put_request(car);
                                up_read(&cdt->cdt_request_lock);
-                               GOTO(out, rc = -ENOMEM);
+                               GOTO(out_cdt_state, rc = -ENOMEM);
                        }
                }
 
@@ -1591,11 +1725,15 @@ static int hsm_cancel_all_actions(struct mdt_device *mdt)
        /* cancel all on-disk records */
        hcad.mdt = mdt;
 
-       rc = cdt_llog_process(mti->mti_env, mti->mti_mdt,
-                             mdt_cancel_all_cb, &hcad);
-out:
-       /* enable coordinator */
-       cdt->cdt_state = save_state;
+       rc = cdt_llog_process(mti->mti_env, mti->mti_mdt, mdt_cancel_all_cb,
+                             &hcad, 0, 0, WRITE);
+out_cdt_state:
+       /* Enable coordinator, unless the coordinator was stopping. */
+       set_cdt_state(cdt, old_state, NULL);
+       lu_context_exit(&session);
+       lu_context_fini(&session);
+out_env:
+       lu_env_fini(&env);
 
        RETURN(rc);
 }
@@ -1603,13 +1741,13 @@ out:
 /**
  * check if a request is compatible with file status
  * \param hai [IN] request description
- * \param hal_an [IN] request archive number (not used)
+ * \param archive_id [IN] request archive id
  * \param rq_flags [IN] request flags
  * \param hsm [IN] file HSM metadata
  * \retval boolean
  */
 bool mdt_hsm_is_action_compat(const struct hsm_action_item *hai,
-                             const int hal_an, const __u64 rq_flags,
+                             u32 archive_id, u64 rq_flags,
                              const struct md_hsm *hsm)
 {
        int      is_compat = false;
@@ -1622,6 +1760,12 @@ bool mdt_hsm_is_action_compat(const struct hsm_action_item *hai,
                if (!(hsm_flags & HS_NOARCHIVE) &&
                    (hsm_flags & HS_DIRTY || !(hsm_flags & HS_ARCHIVED)))
                        is_compat = true;
+
+               if (hsm_flags & HS_EXISTS &&
+                   archive_id != 0 &&
+                   archive_id != hsm->mh_arch_id)
+                       is_compat = false;
+
                break;
        case HSMA_RESTORE:
                if (!(hsm_flags & HS_DIRTY) && (hsm_flags & HS_RELEASED) &&
@@ -1709,7 +1853,7 @@ static void hsm_policy_bit2str(struct seq_file *m, const __u64 mask,
        }
        /* remove last ' ' */
        m->count--;
-       seq_putc(m, '\0');
+       seq_putc(m, '\n');
 }
 
 /* methods to read/write HSM policy flags */
@@ -1884,8 +2028,9 @@ mdt_hsm_cdt_control_seq_write(struct file *file, const char __user *buffer,
        rc = 0;
        if (strcmp(kernbuf, CDT_ENABLE_CMD) == 0) {
                if (cdt->cdt_state == CDT_DISABLE) {
-                       cdt->cdt_state = CDT_RUNNING;
-                       mdt_hsm_cdt_wakeup(mdt);
+                       rc = set_cdt_state(cdt, CDT_RUNNING, NULL);
+                       mdt_hsm_cdt_event(cdt);
+                       wake_up(&cdt->cdt_waitq);
                } else {
                        rc = mdt_hsm_cdt_start(mdt);
                }
@@ -1896,7 +2041,7 @@ mdt_hsm_cdt_control_seq_write(struct file *file, const char __user *buffer,
                               mdt_obd_name(mdt));
                        rc = -EALREADY;
                } else {
-                       cdt->cdt_state = CDT_STOPPING;
+                       rc = mdt_hsm_cdt_stop(mdt);
                }
        } else if (strcmp(kernbuf, CDT_DISABLE_CMD) == 0) {
                if ((cdt->cdt_state == CDT_STOPPING) ||
@@ -1905,7 +2050,7 @@ mdt_hsm_cdt_control_seq_write(struct file *file, const char __user *buffer,
                               mdt_obd_name(mdt));
                        rc = -EINVAL;
                } else {
-                       cdt->cdt_state = CDT_DISABLE;
+                       rc = set_cdt_state(cdt, CDT_DISABLE, NULL);
                }
        } else if (strcmp(kernbuf, CDT_PURGE_CMD) == 0) {
                rc = hsm_cancel_all_actions(mdt);
@@ -1936,18 +2081,7 @@ int mdt_hsm_cdt_control_seq_show(struct seq_file *m, void *data)
 
        cdt = &(mdt_dev(obd->obd_lu_dev)->mdt_coordinator);
 
-       if (cdt->cdt_state == CDT_INIT)
-               seq_printf(m, "init\n");
-       else if (cdt->cdt_state == CDT_RUNNING)
-               seq_printf(m, "enabled\n");
-       else if (cdt->cdt_state == CDT_STOPPING)
-               seq_printf(m, "stopping\n");
-       else if (cdt->cdt_state == CDT_STOPPED)
-               seq_printf(m, "stopped\n");
-       else if (cdt->cdt_state == CDT_DISABLE)
-               seq_printf(m, "disabled\n");
-       else
-               seq_printf(m, "unknown\n");
+       seq_printf(m, "%s\n", cdt_mdt_state2str(cdt->cdt_state));
 
        RETURN(0);
 }
@@ -2097,6 +2231,36 @@ mdt_hsm_other_request_mask_seq_write(struct file *file, const char __user *buf,
                                           &cdt->cdt_other_request_mask);
 }
 
+static int mdt_hsm_cdt_raolu_seq_show(struct seq_file *m, void *data)
+{
+       struct mdt_device *mdt = m->private;
+       struct coordinator *cdt = &mdt->mdt_coordinator;
+       ENTRY;
+
+       seq_printf(m, "%d\n", (int)cdt->cdt_remove_archive_on_last_unlink);
+       RETURN(0);
+}
+
+static ssize_t
+mdt_hsm_cdt_raolu_seq_write(struct file *file, const char __user *buffer,
+                           size_t count, loff_t *off)
+
+{
+       struct seq_file *m = file->private_data;
+       struct mdt_device *mdt = m->private;
+       struct coordinator *cdt = &mdt->mdt_coordinator;
+       __s64 val;
+       int rc;
+       ENTRY;
+
+       rc = lprocfs_str_to_s64(buffer, count, &val);
+       if (rc < 0)
+               RETURN(rc);
+
+       cdt->cdt_remove_archive_on_last_unlink = val;
+       RETURN(count);
+}
+
 LPROC_SEQ_FOPS(mdt_hsm_cdt_loop_period);
 LPROC_SEQ_FOPS(mdt_hsm_cdt_grace_delay);
 LPROC_SEQ_FOPS(mdt_hsm_cdt_active_req_timeout);
@@ -2105,6 +2269,7 @@ LPROC_SEQ_FOPS(mdt_hsm_cdt_default_archive_id);
 LPROC_SEQ_FOPS(mdt_hsm_user_request_mask);
 LPROC_SEQ_FOPS(mdt_hsm_group_request_mask);
 LPROC_SEQ_FOPS(mdt_hsm_other_request_mask);
+LPROC_SEQ_FOPS(mdt_hsm_cdt_raolu);
 
 static struct lprocfs_vars lprocfs_mdt_hsm_vars[] = {
        { .name =       "agents",
@@ -2132,5 +2297,7 @@ static struct lprocfs_vars lprocfs_mdt_hsm_vars[] = {
          .fops =       &mdt_hsm_group_request_mask_fops,       },
        { .name =       "other_request_mask",
          .fops =       &mdt_hsm_other_request_mask_fops,       },
+       { .name =       "remove_archive_on_last_unlink",
+         .fops =       &mdt_hsm_cdt_raolu_fops,                },
        { 0 }
 };