Whamcloud - gitweb
LU-3343 mdt: HSM coordinator main thread 12/6912/14
authorjcl <jacques-charles.lafoucriere@cea.fr>
Sat, 6 Jul 2013 12:57:08 +0000 (14:57 +0200)
committerOleg Drokin <oleg.drokin@intel.com>
Wed, 7 Aug 2013 20:20:52 +0000 (20:20 +0000)
This patch implements the HSM coordinator. It is a MDT
thread in charge of schedulling HSM requests to
agents.

Signed-off-by: JC Lafoucriere <jacques-charles.lafoucriere@cea.fr>
Signed-off-by: Jinshan Xiong <jinshan.xiong@intel.com>
Change-Id: I18fb2fad94b3972b9a09fd093e259f4ad50d810f
Reviewed-on: http://review.whamcloud.com/6912
Tested-by: Hudson
Reviewed-by: John L. Hammond <john.hammond@intel.com>
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
13 files changed:
lustre/include/lustre/lustre_user.h
lustre/include/md_object.h
lustre/mdt/Makefile.in
lustre/mdt/mdt_coordinator.c [new file with mode: 0644]
lustre/mdt/mdt_handler.c
lustre/mdt/mdt_hsm.c
lustre/mdt/mdt_hsm_cdt_actions.c
lustre/mdt/mdt_hsm_cdt_agent.c
lustre/mdt/mdt_hsm_cdt_client.c
lustre/mdt/mdt_hsm_cdt_requests.c
lustre/mdt/mdt_internal.h
lustre/mdt/mdt_lproc.c
lustre/obdclass/md_attrs.c

index c856390..045ff2b 100644 (file)
@@ -1071,7 +1071,7 @@ struct hsm_action_item {
  * \param len [IN] max buffer len
  * \retval buffer
  */
  * \param len [IN] max buffer len
  * \retval buffer
  */
-static inline char *hai_dump_data_field(struct hsm_action_item *hai,
+static inline char *hai_dump_data_field(const struct hsm_action_item *hai,
                                         char *buffer, int len)
 {
         int i, sz, data_len;
                                         char *buffer, int len)
 {
         int i, sz, data_len;
index 6b83789..304646c 100644 (file)
@@ -849,7 +849,7 @@ struct lu_local_obj_desc {
 
 int lustre_buf2som(void *buf, int rc, struct md_som_data *msd);
 int lustre_buf2hsm(void *buf, int rc, struct md_hsm *mh);
 
 int lustre_buf2som(void *buf, int rc, struct md_som_data *msd);
 int lustre_buf2hsm(void *buf, int rc, struct md_hsm *mh);
-void lustre_hsm2buf(void *buf, struct md_hsm *mh);
+void lustre_hsm2buf(void *buf, const struct md_hsm *mh);
 
 enum {
        UCRED_INVALID   = -1,
 
 enum {
        UCRED_INVALID   = -1,
index a6b55e5..2b23a67 100644 (file)
@@ -6,5 +6,6 @@ mdt-objs += mdt_hsm_cdt_actions.o
 mdt-objs += mdt_hsm_cdt_requests.o
 mdt-objs += mdt_hsm_cdt_client.o
 mdt-objs += mdt_hsm_cdt_agent.o
 mdt-objs += mdt_hsm_cdt_requests.o
 mdt-objs += mdt_hsm_cdt_client.o
 mdt-objs += mdt_hsm_cdt_agent.o
+mdt-objs += mdt_coordinator.o
 
 @INCLUDE_RULES@
 
 @INCLUDE_RULES@
diff --git a/lustre/mdt/mdt_coordinator.c b/lustre/mdt/mdt_coordinator.c
new file mode 100644 (file)
index 0000000..a48bc36
--- /dev/null
@@ -0,0 +1,1999 @@
+/*
+ * GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License version 2 for more details.  A copy is
+ * included in the COPYING file that accompanied this code.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright (c) 2012, 2013, Intel Corporation.
+ * Use is subject to license terms.
+ * Copyright (c) 2011, 2012 Commissariat a l'energie atomique et aux energies
+ *                          alternatives
+ */
+/*
+ * lustre/mdt/mdt_coordinator.c
+ *
+ * Lustre HSM Coordinator
+ *
+ * Author: Jacques-Charles Lafoucriere <jacques-charles.lafoucriere@cea.fr>
+ * Author: Aurelien Degremont <aurelien.degremont@cea.fr>
+ * Author: Thomas Leibovici <thomas.leibovici@cea.fr>
+ */
+
+#define DEBUG_SUBSYSTEM S_MDS
+
+#include <obd_support.h>
+#include <lustre_net.h>
+#include <lustre_export.h>
+#include <obd.h>
+#include <obd_lov.h>
+#include <lprocfs_status.h>
+#include <lustre_log.h>
+#include "mdt_internal.h"
+
+static struct lprocfs_vars lprocfs_mdt_hsm_vars[];
+
+/**
+ * get obj and HSM attributes on a fid
+ * \param mti [IN] context
+ * \param fid [IN] object fid
+ * \param hsm [OUT] HSM meta data
+ * \retval obj
+ */
+struct mdt_object *mdt_hsm_get_md_hsm(struct mdt_thread_info *mti,
+                                     const struct lu_fid *fid,
+                                     struct md_hsm *hsm)
+{
+       struct md_attr          *ma;
+       struct mdt_object       *obj;
+       int                      rc;
+       ENTRY;
+
+       ma = &mti->mti_attr;
+       ma->ma_need = MA_HSM;
+       ma->ma_valid = 0;
+
+       /* find object by FID */
+       obj = mdt_object_find(mti->mti_env, mti->mti_mdt, fid);
+       if (IS_ERR(obj))
+               RETURN(obj);
+
+       if (!mdt_object_exists(obj)) {
+               /* no more object */
+               mdt_object_put(mti->mti_env, obj);
+               RETURN(ERR_PTR(-ENOENT));
+       }
+
+       rc = mdt_attr_get_complex(mti, obj, ma);
+       if (rc) {
+               mdt_object_put(mti->mti_env, obj);
+               RETURN(ERR_PTR(rc));
+       }
+
+       if (ma->ma_valid & MA_HSM)
+               *hsm = ma->ma_hsm;
+       else
+               memset(hsm, 0, sizeof(*hsm));
+       ma->ma_valid = 0;
+       RETURN(obj);
+}
+
+void mdt_hsm_dump_hal(int level, const char *prefix,
+                     struct hsm_action_list *hal)
+{
+       int                      i, sz;
+       struct hsm_action_item  *hai;
+       char                     buf[12];
+
+       CDEBUG(level, "%s: HAL header: version %X count %d compound "LPX64
+                     " archive_id %d flags "LPX64"\n",
+              prefix, hal->hal_version, hal->hal_count,
+              hal->hal_compound_id, hal->hal_archive_id, hal->hal_flags);
+
+       hai = hai_zero(hal);
+       for (i = 0; i < hal->hal_count; i++) {
+               sz = hai->hai_len - sizeof(*hai);
+               CDEBUG(level, "%s %d: fid="DFID" dfid="DFID
+                      " compound/cookie="LPX64"/"LPX64
+                      " action=%s extent="LPX64"-"LPX64" gid="LPX64
+                      " datalen=%d data=[%s]\n",
+                      prefix, i,
+                      PFID(&hai->hai_fid), PFID(&hai->hai_dfid),
+                      hal->hal_compound_id, hai->hai_cookie,
+                      hsm_copytool_action2name(hai->hai_action),
+                      hai->hai_extent.offset,
+                      hai->hai_extent.length,
+                      hai->hai_gid, sz,
+                      hai_dump_data_field(hai, buf, sizeof(buf)));
+               hai = hai_next(hai);
+       }
+}
+
+/**
+ * data passed to llog_cat_process() callback
+ * to scan requests and take actions
+ */
+struct hsm_scan_data {
+       struct mdt_thread_info          *mti;
+       char                             fs_name[MTI_NAME_MAXLEN+1];
+       /* request to be send to agents */
+       int                              request_sz;    /** allocated size */
+       int                              max_request;   /** vector size */
+       int                              request_cnt;   /** used count */
+       struct {
+               int                      hal_sz;
+               int                      hal_used_sz;
+               struct hsm_action_list  *hal;
+       } *request;
+       /* records to be canceled */
+       int                              max_cookie;    /** vector size */
+       int                              cookie_cnt;    /** used count */
+       __u64                           *cookies;
+};
+
+/**
+ *  llog_cat_process() callback, used to:
+ *  - find waiting request and start action
+ *  - purge canceled and done requests
+ * \param env [IN] environment
+ * \param llh [IN] llog handle
+ * \param hdr [IN] llog record
+ * \param data [IN/OUT] cb data = struct hsm_scan_data
+ * \retval 0 success
+ * \retval -ve failure
+ */
+static int mdt_coordinator_cb(const struct lu_env *env,
+                             struct llog_handle *llh,
+                             struct llog_rec_hdr *hdr,
+                             void *data)
+{
+       const struct llog_agent_req_rec *larr;
+       struct hsm_scan_data            *hsd;
+       struct hsm_action_item          *hai;
+       struct mdt_device               *mdt;
+       struct coordinator              *cdt;
+       int                              rc;
+       ENTRY;
+
+       hsd = data;
+       mdt = hsd->mti->mti_mdt;
+       cdt = &mdt->mdt_coordinator;
+
+       larr = (struct llog_agent_req_rec *)hdr;
+       dump_llog_agent_req_rec("mdt_coordinator_cb(): ", larr);
+       switch (larr->arr_status) {
+       case ARS_WAITING: {
+               int i, empty_slot, found;
+
+               /* Are agents full? */
+               if (atomic_read(&cdt->cdt_request_count) ==
+                   cdt->cdt_max_request)
+                       break;
+
+               /* first search if the request if known in the list we have
+                * build and if there is room in the request vector */
+               empty_slot = -1;
+               found = -1;
+               for (i = 0; i < hsd->max_request &&
+                           (empty_slot == -1 || found == -1); i++) {
+                       if (hsd->request[i].hal == NULL) {
+                               empty_slot = i;
+                               continue;
+                       }
+                       if (hsd->request[i].hal->hal_compound_id ==
+                               larr->arr_compound_id) {
+                               found = i;
+                               continue;
+                       }
+               }
+               if ((found == -1) && (empty_slot == -1))
+                       /* unknown request and no more room for new request,
+                        * continue scan for to find other entries for
+                        * already found request
+                        */
+                       RETURN(0);
+
+               if (found == -1) {
+                       struct hsm_action_list *hal;
+
+                       /* request is not already known */
+                       /* allocates hai vector size just needs to be large
+                        * enough */
+                       hsd->request[empty_slot].hal_sz =
+                                    sizeof(*hsd->request[empty_slot].hal) +
+                                    cfs_size_round(MTI_NAME_MAXLEN+1) +
+                                    2 * cfs_size_round(larr->arr_hai.hai_len);
+                       OBD_ALLOC(hal, hsd->request[empty_slot].hal_sz);
+                       if (!hal) {
+                               CERROR("%s: Cannot allocate memory (%d o)"
+                                      "for compound "LPX64"\n",
+                                      mdt_obd_name(mdt),
+                                      hsd->request[i].hal_sz,
+                                      larr->arr_compound_id);
+                               RETURN(-ENOMEM);
+                       }
+                       hal->hal_version = HAL_VERSION;
+                       strncpy(hal->hal_fsname, hsd->fs_name,
+                               MTI_NAME_MAXLEN);
+                       hal->hal_fsname[MTI_NAME_MAXLEN] = '\0';
+                       hal->hal_compound_id = larr->arr_compound_id;
+                       hal->hal_archive_id = larr->arr_archive_id;
+                       hal->hal_flags = larr->arr_flags;
+                       hal->hal_count = 0;
+                       hsd->request[empty_slot].hal_used_sz = hal_size(hal);
+                       hsd->request[empty_slot].hal = hal;
+                       hsd->request_cnt++;
+                       found = empty_slot;
+                       hai = hai_zero(hal);
+               } else {
+                       /* request is known */
+                       /* we check if record archive num is the same as the
+                        * known request, if not we will serve it in multiple
+                        * time because we do not know if the agent can serve
+                        * multiple backend
+                        * a use case is a compound made of multiple restore
+                        * where the files are not archived in the same backend
+                        */
+                       if (larr->arr_archive_id !=
+                           hsd->request[found].hal->hal_archive_id)
+                               RETURN(0);
+
+                       if (hsd->request[found].hal_sz <
+                           hsd->request[found].hal_used_sz +
+                            cfs_size_round(larr->arr_hai.hai_len)) {
+                               /* Not enough room, need an extension */
+                               void *hal_buffer;
+                               int sz;
+
+                               sz = 2 * hsd->request[found].hal_sz;
+                               OBD_ALLOC(hal_buffer, sz);
+                               if (!hal_buffer) {
+                                       CERROR("%s: Cannot allocate memory "
+                                              "(%d o) for compound "LPX64"\n",
+                                              mdt_obd_name(mdt), sz,
+                                              larr->arr_compound_id);
+                                       RETURN(-ENOMEM);
+                               }
+                               memcpy(hal_buffer, hsd->request[found].hal,
+                                      hsd->request[found].hal_used_sz);
+                               OBD_FREE(hsd->request[found].hal,
+                                        hsd->request[found].hal_sz);
+                               hsd->request[found].hal = hal_buffer;
+                               hsd->request[found].hal_sz = sz;
+                       }
+                       hai = hai_zero(hsd->request[found].hal);
+                       for (i = 0; i < hsd->request[found].hal->hal_count;
+                            i++)
+                               hai = hai_next(hai);
+               }
+               memcpy(hai, &larr->arr_hai, larr->arr_hai.hai_len);
+               hai->hai_cookie = larr->arr_hai.hai_cookie;
+               hai->hai_gid = larr->arr_hai.hai_gid;
+
+               hsd->request[found].hal_used_sz +=
+                                                  cfs_size_round(hai->hai_len);
+               hsd->request[found].hal->hal_count++;
+               break;
+       }
+       case ARS_STARTED: {
+               struct cdt_agent_req *car;
+               cfs_time_t last;
+
+               /* we search for a running request
+                * error may happen if coordinator crashes or stopped
+                * with running request
+                */
+               car = mdt_cdt_find_request(cdt, larr->arr_hai.hai_cookie, NULL);
+               if (car == NULL) {
+                       last = larr->arr_req_create;
+               } else {
+                       last = car->car_req_update;
+                       mdt_cdt_put_request(car);
+               }
+
+               /* test if request too long, if yes cancel it
+                * the same way the copy tool acknowledge a cancel request */
+               if ((last + cdt->cdt_timeout) < cfs_time_current_sec()) {
+                       struct hsm_progress_kernel pgs;
+
+                       dump_llog_agent_req_rec("mdt_coordinator_cb(): "
+                                               "request timeouted, start "
+                                               "cleaning", larr);
+                       /* a too old cancel request just needs to be removed
+                        * this can happen, if copy tool does not support cancel
+                        * for other requests, we have to remove the running
+                        * request and notify the copytool
+                        */
+                       pgs.hpk_fid = larr->arr_hai.hai_fid;
+                       pgs.hpk_cookie = larr->arr_hai.hai_cookie;
+                       pgs.hpk_extent = larr->arr_hai.hai_extent;
+                       pgs.hpk_flags = HP_FLAG_COMPLETED;
+                       pgs.hpk_errval = ENOSYS;
+                       pgs.hpk_data_version = 0;
+                       /* update request state, but do not record in llog, to
+                        * avoid deadlock on cdt_llog_lock
+                        */
+                       rc = mdt_hsm_update_request_state(hsd->mti, &pgs, 0);
+                       if (rc)
+                               CERROR("%s: Cannot cleanup timeouted request: "
+                                      DFID" for cookie "LPX64" action=%s\n",
+                                      mdt_obd_name(mdt),
+                                      PFID(&pgs.hpk_fid), pgs.hpk_cookie,
+                                      hsm_copytool_action2name(
+                                                    larr->arr_hai.hai_action));
+
+                       /* add the cookie to the list of record to be
+                        * canceled by caller */
+                       if (hsd->max_cookie == (hsd->cookie_cnt - 1)) {
+                               __u64 *ptr, *old_ptr;
+                               int old_sz, new_sz, new_cnt;
+
+                               /* need to increase vector size */
+                               old_sz = sizeof(__u64) * hsd->max_cookie;
+                               old_ptr = hsd->cookies;
+
+                               new_cnt = 2 * hsd->max_cookie;
+                               new_sz = sizeof(__u64) * new_cnt;
+
+                               OBD_ALLOC(ptr, new_sz);
+                               if (!ptr) {
+                                       CERROR("%s: Cannot allocate memory "
+                                              "(%d o) for cookie vector\n",
+                                              mdt_obd_name(mdt), new_sz);
+                                       RETURN(-ENOMEM);
+                               }
+                               memcpy(ptr, hsd->cookies, old_sz);
+                               hsd->cookies = ptr;
+                               hsd->max_cookie = new_cnt;
+                               OBD_FREE(old_ptr, old_sz);
+                       }
+                       hsd->cookies[hsd->cookie_cnt] =
+                                                      larr->arr_hai.hai_cookie;
+                       hsd->cookie_cnt++;
+               }
+               break;
+       }
+       case ARS_FAILED:
+       case ARS_CANCELED:
+       case ARS_SUCCEED:
+               if ((larr->arr_req_change + cdt->cdt_delay) <
+                   cfs_time_current_sec())
+                       RETURN(LLOG_DEL_RECORD);
+               break;
+       }
+       RETURN(0);
+}
+
+/**
+ * create /proc entries for coordinator
+ * \param mdt [IN]
+ * \retval 0 success
+ * \retval -ve failure
+ */
+static int hsm_cdt_procfs_init(struct mdt_device *mdt)
+{
+       struct coordinator      *cdt = &mdt->mdt_coordinator;
+       int                      rc = 0;
+       ENTRY;
+
+       /* init /proc entries, failure is not critical */
+       cdt->cdt_proc_dir = lprocfs_register("hsm",
+                                            mdt2obd_dev(mdt)->obd_proc_entry,
+                                            lprocfs_mdt_hsm_vars, mdt);
+       if (IS_ERR(cdt->cdt_proc_dir)) {
+               rc = PTR_ERR(cdt->cdt_proc_dir);
+               CERROR("%s: Cannot create 'hsm' directory in mdt proc dir,"
+                      " rc=%d\n", mdt_obd_name(mdt), rc);
+               cdt->cdt_proc_dir = NULL;
+               RETURN(rc);
+       }
+
+       RETURN(0);
+}
+
+/**
+ * coordinator thread
+ * \param data [IN] obd device
+ * \retval 0 success
+ * \retval -ve failure
+ */
+static int mdt_coordinator(void *data)
+{
+       struct mdt_thread_info  *mti = data;
+       struct mdt_device       *mdt = mti->mti_mdt;
+       struct coordinator      *cdt = &mdt->mdt_coordinator;
+       struct hsm_scan_data     hsd = { 0 };
+       int                      rc = 0;
+       ENTRY;
+
+       cdt->cdt_thread.t_flags = SVC_RUNNING;
+       cfs_waitq_signal(&cdt->cdt_thread.t_ctl_waitq);
+
+       CDEBUG(D_HSM, "%s: coordinator thread starting, pid=%d\n",
+              mdt_obd_name(mdt), cfs_curproc_pid());
+
+       /*
+        * create /proc entries for coordinator
+        */
+       hsm_cdt_procfs_init(mdt);
+       /* timeouted cookie vector initialization */
+       hsd.max_cookie = 0;
+       hsd.cookie_cnt = 0;
+       hsd.cookies = NULL;
+       /* we use a copy of cdt_max_request in the cb, so if cdt_max_request
+        * increases due to a change from /proc we do not overflow the
+        * hsd.request[] vector
+        */
+       hsd.max_request = cdt->cdt_max_request;
+       hsd.request_sz = hsd.max_request * sizeof(*hsd.request);
+       OBD_ALLOC(hsd.request, hsd.request_sz);
+       if (!hsd.request)
+               GOTO(out, rc = -ENOMEM);
+
+       hsd.mti = mti;
+       obd_uuid2fsname(hsd.fs_name, mdt_obd_name(mdt), MTI_NAME_MAXLEN);
+
+       while (1) {
+               struct l_wait_info lwi;
+               int i;
+
+               lwi = LWI_TIMEOUT(cfs_time_seconds(cdt->cdt_loop_period),
+                                 NULL, NULL);
+               l_wait_event(cdt->cdt_thread.t_ctl_waitq,
+                            (cdt->cdt_thread.t_flags &
+                             (SVC_STOPPING|SVC_EVENT)),
+                            &lwi);
+
+               CDEBUG(D_HSM, "coordinator resumes\n");
+
+               if ((cdt->cdt_thread.t_flags & SVC_STOPPING) ||
+                   (cdt->cdt_state == CDT_STOPPING)) {
+                       cdt->cdt_thread.t_flags &= ~SVC_STOPPING;
+                       rc = 0;
+                       break;
+               }
+
+               /* wake up before timeout, new work arrives */
+               if (cdt->cdt_thread.t_flags & SVC_EVENT)
+                       cdt->cdt_thread.t_flags &= ~SVC_EVENT;
+
+               /* if coordinator is suspended continue to wait */
+               if (cdt->cdt_state == CDT_DISABLE) {
+                       CDEBUG(D_HSM, "disable state, coordinator sleeps\n");
+                       continue;
+               }
+
+               CDEBUG(D_HSM, "coordinator starts reading llog\n");
+
+               if (hsd.max_request != cdt->cdt_max_request) {
+                       /* cdt_max_request has changed,
+                        * we need to allocate a new buffer
+                        */
+                       OBD_FREE(hsd.request, hsd.request_sz);
+                       hsd.max_request = cdt->cdt_max_request;
+                       hsd.request_sz =
+                                  hsd.max_request * sizeof(*hsd.request);
+                       OBD_ALLOC(hsd.request, hsd.request_sz);
+                       if (!hsd.request) {
+                               rc = -ENOMEM;
+                               break;
+                       }
+               }
+
+               /* create canceled cookie vector for an arbitrary size
+                * if needed, vector will grow during llog scan
+                */
+               hsd.max_cookie = 10;
+               hsd.cookie_cnt = 0;
+               OBD_ALLOC(hsd.cookies, hsd.max_cookie * sizeof(__u64));
+               if (!hsd.cookies) {
+                       rc = -ENOMEM;
+                       goto clean_cb_alloc;
+               }
+               hsd.request_cnt = 0;
+
+               rc = cdt_llog_process(mti->mti_env, mdt,
+                                     mdt_coordinator_cb, &hsd);
+               if (rc < 0)
+                       goto clean_cb_alloc;
+
+               CDEBUG(D_HSM, "Found %d requests to send and %d"
+                             " requests to cancel\n",
+                      hsd.request_cnt, hsd.cookie_cnt);
+               /* first we cancel llog records of the timeouted requests */
+               if (hsd.cookie_cnt > 0) {
+                       rc = mdt_agent_record_update(mti->mti_env, mdt,
+                                                    hsd.cookies,
+                                                    hsd.cookie_cnt,
+                                                    ARS_CANCELED);
+                       if (rc)
+                               CERROR("%s: mdt_agent_record_update() failed, "
+                                      "rc=%d, cannot update status to %s "
+                                      "for %d cookies\n",
+                                      mdt_obd_name(mdt), rc,
+                                      agent_req_status2name(ARS_CANCELED),
+                                      hsd.cookie_cnt);
+               }
+
+               if (list_empty(&cdt->cdt_agents)) {
+                       CDEBUG(D_HSM, "no agent available, "
+                                     "coordinator sleeps\n");
+                       goto clean_cb_alloc;
+               }
+
+               /* here hsd contains a list of requests to be started */
+               for (i = 0; i < hsd.max_request; i++) {
+                       struct hsm_action_list  *hal;
+                       struct hsm_action_item  *hai;
+                       __u64                   *cookies;
+                       int                      sz, j;
+                       enum agent_req_status    status;
+
+                       /* still room for work ? */
+                       if (atomic_read(&cdt->cdt_request_count) ==
+                           cdt->cdt_max_request)
+                               break;
+
+                       if (hsd.request[i].hal == NULL)
+                               continue;
+
+                       /* found a request, we start it */
+                       /* kuc payload allocation so we avoid an additionnal
+                        * allocation in mdt_hsm_agent_send()
+                        */
+                       hal = kuc_alloc(hsd.request[i].hal_used_sz,
+                                       KUC_TRANSPORT_HSM, HMT_ACTION_LIST);
+                       if (IS_ERR(hal)) {
+                               CERROR("%s: Cannot allocate memory (%d o) "
+                                      "for compound "LPX64"\n",
+                                      mdt_obd_name(mdt),
+                                      hsd.request[i].hal_used_sz,
+                                      hsd.request[i].hal->hal_compound_id);
+                               continue;
+                       }
+                       memcpy(hal, hsd.request[i].hal,
+                              hsd.request[i].hal_used_sz);
+
+                       rc = mdt_hsm_agent_send(mti, hal, 0);
+                       /* if failure, we suppose it is temporary
+                        * if the copy tool failed to do the request
+                        * it has to use hsm_progress
+                        */
+                       status = (rc ? ARS_WAITING : ARS_STARTED);
+
+                       /* set up cookie vector to set records status
+                        * after copy tools start or failed
+                        */
+                       sz = hsd.request[i].hal->hal_count * sizeof(__u64);
+                       OBD_ALLOC(cookies, sz);
+                       if (cookies == NULL) {
+                               CERROR("%s: Cannot allocate memory (%d o) "
+                                      "for cookies vector "LPX64"\n",
+                                      mdt_obd_name(mdt), sz,
+                                      hsd.request[i].hal->hal_compound_id);
+                               kuc_free(hal, hsd.request[i].hal_used_sz);
+                               continue;
+                       }
+                       hai = hai_zero(hal);
+                       for (j = 0; j < hsd.request[i].hal->hal_count; j++) {
+                               cookies[j] = hai->hai_cookie;
+                               hai = hai_next(hai);
+                       }
+
+                       rc = mdt_agent_record_update(mti->mti_env, mdt, cookies,
+                                               hsd.request[i].hal->hal_count,
+                                               status);
+                       if (rc)
+                               CERROR("%s: mdt_agent_record_update() failed, "
+                                      "rc=%d, cannot update status to %s "
+                                      "for %d cookies\n",
+                                      mdt_obd_name(mdt), rc,
+                                      agent_req_status2name(status),
+                                      hsd.request[i].hal->hal_count);
+
+                       OBD_FREE(cookies, sz);
+                       kuc_free(hal, hsd.request[i].hal_used_sz);
+               }
+clean_cb_alloc:
+               /* free cookie vector allocated for/by callback */
+               if (hsd.cookies) {
+                       OBD_FREE(hsd.cookies, hsd.max_cookie * sizeof(__u64));
+                       hsd.max_cookie = 0;
+                       hsd.cookie_cnt = 0;
+                       hsd.cookies = NULL;
+               }
+
+               /* free hal allocated by callback */
+               for (i = 0; i < hsd.max_request; i++) {
+                       if (hsd.request[i].hal) {
+                               OBD_FREE(hsd.request[i].hal,
+                                        hsd.request[i].hal_sz);
+                               hsd.request[i].hal_sz = 0;
+                               hsd.request[i].hal = NULL;
+                               hsd.request_cnt--;
+                       }
+               }
+               LASSERT(hsd.request_cnt == 0);
+
+               /* reset callback data */
+               memset(hsd.request, 0, hsd.request_sz);
+       }
+       EXIT;
+out:
+       if (hsd.request)
+               OBD_FREE(hsd.request, hsd.request_sz);
+
+       if (hsd.cookies)
+               OBD_FREE(hsd.cookies, hsd.max_cookie * sizeof(__u64));
+
+       if (cdt->cdt_state == CDT_STOPPING) {
+               /* request comes from /proc path, so we need to clean cdt
+                * struct */
+                mdt_hsm_cdt_stop(mdt);
+                mdt->mdt_opts.mo_coordinator = 0;
+       } else {
+               /* request comes from a thread event, generated
+                * by mdt_stop_coordinator(), we have to ack
+                * and cdt cleaning will be done by event sender
+                */
+               cdt->cdt_thread.t_flags = SVC_STOPPED;
+               cfs_waitq_signal(&cdt->cdt_thread.t_ctl_waitq);
+       }
+
+       if (rc != 0)
+               CERROR("%s: coordinator thread exiting, process=%d, rc=%d\n",
+                      mdt_obd_name(mdt), cfs_curproc_pid(), rc);
+       else
+               CDEBUG(D_HSM, "%s: coordinator thread exiting, process=%d,"
+                             " no error\n",
+                      mdt_obd_name(mdt), cfs_curproc_pid());
+
+       return rc;
+}
+
+/**
+ * lookup a restore handle by FID
+ * caller needs to hold cdt_restore_lock
+ * \param cdt [IN] coordinator
+ * \param fid [IN] FID
+ * \retval cdt_restore_handle found
+ * \retval NULL not found
+ */
+static struct cdt_restore_handle *hsm_restore_hdl_find(struct coordinator *cdt,
+                                                      const struct lu_fid *fid)
+{
+       struct cdt_restore_handle       *crh;
+       ENTRY;
+
+       list_for_each_entry(crh, &cdt->cdt_restore_hdl, crh_list) {
+               if (lu_fid_eq(&crh->crh_fid, fid))
+                       RETURN(crh);
+       }
+       RETURN(NULL);
+}
+
+/**
+ * data passed to llog_cat_process() callback
+ * to scan requests and take actions
+ */
+struct hsm_restore_data {
+       struct mdt_thread_info  *hrd_mti;
+};
+
+/**
+ *  llog_cat_process() callback, used to:
+ *  - find restore request and allocate the restore handle
+ * \param env [IN] environment
+ * \param llh [IN] llog handle
+ * \param hdr [IN] llog record
+ * \param data [IN/OUT] cb data = struct hsm_restore_data
+ * \retval 0 success
+ * \retval -ve failure
+ */
+static int hsm_restore_cb(const struct lu_env *env,
+                         struct llog_handle *llh,
+                         struct llog_rec_hdr *hdr, void *data)
+{
+       struct llog_agent_req_rec       *larr;
+       struct hsm_restore_data         *hrd;
+       struct cdt_restore_handle       *crh;
+       struct hsm_action_item          *hai;
+       struct mdt_thread_info          *mti;
+       struct coordinator              *cdt;
+       struct mdt_object               *child;
+       int rc;
+       ENTRY;
+
+       hrd = data;
+       mti = hrd->hrd_mti;
+       cdt = &mti->mti_mdt->mdt_coordinator;
+
+       larr = (struct llog_agent_req_rec *)hdr;
+       hai = &larr->arr_hai;
+       if ((hai->hai_action != HSMA_RESTORE) ||
+            agent_req_in_final_state(larr->arr_status))
+               RETURN(0);
+
+       /* restore request not in a final state */
+
+       OBD_SLAB_ALLOC_PTR(crh, mdt_hsm_cdt_kmem);
+       if (crh == NULL)
+               RETURN(-ENOMEM);
+
+       crh->crh_fid = hai->hai_fid;
+       /* in V1 all file is restored
+       crh->extent.start = hai->hai_extent.offset;
+       crh->extent.end = hai->hai_extent.offset + hai->hai_extent.length;
+       */
+       crh->crh_extent.start = 0;
+       crh->crh_extent.end = OBD_OBJECT_EOF;
+       /* get the layout lock */
+       mdt_lock_reg_init(&crh->crh_lh, LCK_EX);
+       child = mdt_object_find_lock(mti, &crh->crh_fid, &crh->crh_lh,
+                                    MDS_INODELOCK_LAYOUT);
+       if (IS_ERR(child))
+               GOTO(out, rc = PTR_ERR(child));
+
+       rc = 0;
+       /* we choose to not keep a reference
+        * on the object during the restore time which can be very long */
+       mdt_object_put(mti->mti_env, child);
+
+       mutex_lock(&cdt->cdt_restore_lock);
+       list_add_tail(&crh->crh_list, &cdt->cdt_restore_hdl);
+       mutex_unlock(&cdt->cdt_restore_lock);
+
+out:
+       RETURN(rc);
+}
+
+/**
+ * restore coordinator state at startup
+ * the goal is to take a layout lock for each registered restore request
+ * \param mti [IN] context
+ */
+static int mdt_hsm_pending_restore(struct mdt_thread_info *mti)
+{
+       struct hsm_restore_data  hrd;
+       int                      rc;
+       ENTRY;
+
+       hrd.hrd_mti = mti;
+
+       rc = cdt_llog_process(mti->mti_env, mti->mti_mdt,
+                             hsm_restore_cb, &hrd);
+
+       RETURN(rc);
+}
+
+static int hsm_init_ucred(struct lu_ucred *uc)
+{
+       ENTRY;
+
+       uc->uc_valid = UCRED_OLD;
+       uc->uc_o_uid = 0;
+       uc->uc_o_gid = 0;
+       uc->uc_o_fsuid = 0;
+       uc->uc_o_fsgid = 0;
+       uc->uc_uid = 0;
+       uc->uc_gid = 0;
+       uc->uc_fsuid = 0;
+       uc->uc_fsgid = 0;
+       uc->uc_suppgids[0] = -1;
+       uc->uc_suppgids[1] = -1;
+       uc->uc_cap = 0;
+       uc->uc_umask = 0777;
+       uc->uc_ginfo = NULL;
+       uc->uc_identity = NULL;
+
+       RETURN(0);
+}
+
+/**
+ * wake up coordinator thread
+ * \param mdt [IN] device
+ * \retval 0 success
+ * \retval -ve failure
+ */
+int mdt_hsm_cdt_wakeup(struct mdt_device *mdt)
+{
+       struct coordinator      *cdt = &mdt->mdt_coordinator;
+       ENTRY;
+
+       if (cdt->cdt_state == CDT_STOPPED)
+               RETURN(-ESRCH);
+
+       /* wake up coordinator */
+       cdt->cdt_thread.t_flags = SVC_EVENT;
+       cfs_waitq_signal(&cdt->cdt_thread.t_ctl_waitq);
+
+       RETURN(0);
+}
+
+/**
+ * initialize coordinator struct
+ * \param mdt [IN] device
+ * \retval 0 success
+ * \retval -ve failure
+ */
+int mdt_hsm_cdt_init(struct mdt_device *mdt)
+{
+       struct coordinator      *cdt = &mdt->mdt_coordinator;
+       struct mdt_thread_info  *cdt_mti = NULL;
+       int                      rc;
+       ENTRY;
+
+       cdt->cdt_state = CDT_STOPPED;
+
+       cfs_waitq_init(&cdt->cdt_thread.t_ctl_waitq);
+       mutex_init(&cdt->cdt_llog_lock);
+       init_rwsem(&cdt->cdt_agent_lock);
+       init_rwsem(&cdt->cdt_request_lock);
+       mutex_init(&cdt->cdt_restore_lock);
+
+       CFS_INIT_LIST_HEAD(&cdt->cdt_requests);
+       CFS_INIT_LIST_HEAD(&cdt->cdt_agents);
+       CFS_INIT_LIST_HEAD(&cdt->cdt_restore_hdl);
+
+       rc = lu_env_init(&cdt->cdt_env, LCT_MD_THREAD);
+       if (rc < 0)
+               RETURN(rc);
+
+       /* for mdt_ucred(), lu_ucred stored in lu_ucred_key */
+       rc = lu_context_init(&cdt->cdt_session, LCT_SESSION);
+       if (rc == 0) {
+               lu_context_enter(&cdt->cdt_session);
+               cdt->cdt_env.le_ses = &cdt->cdt_session;
+       } else {
+               lu_env_fini(&cdt->cdt_env);
+               RETURN(rc);
+       }
+
+       cdt_mti = lu_context_key_get(&cdt->cdt_env.le_ctx, &mdt_thread_key);
+       LASSERT(cdt_mti != NULL);
+
+       cdt_mti->mti_env = &cdt->cdt_env;
+       cdt_mti->mti_mdt = mdt;
+
+       hsm_init_ucred(mdt_ucred(cdt_mti));
+
+       RETURN(0);
+}
+
+/**
+ * free a coordinator thread
+ * \param mdt [IN] device
+ */
+int  mdt_hsm_cdt_fini(struct mdt_device *mdt)
+{
+       struct coordinator *cdt = &mdt->mdt_coordinator;
+       ENTRY;
+
+       lu_context_exit(cdt->cdt_env.le_ses);
+       lu_context_fini(cdt->cdt_env.le_ses);
+
+       lu_env_fini(&cdt->cdt_env);
+
+       RETURN(0);
+}
+
+/**
+ * start a coordinator thread
+ * \param mdt [IN] device
+ * \retval 0 success
+ * \retval -ve failure
+ */
+int mdt_hsm_cdt_start(struct mdt_device *mdt)
+{
+       struct coordinator      *cdt = &mdt->mdt_coordinator;
+       int                      rc;
+       void                    *ptr;
+       struct mdt_thread_info  *cdt_mti;
+       cfs_task_t              *task;
+       ENTRY;
+
+       /* functions defined but not yet used
+        * this avoid compilation warning
+        */
+       ptr = dump_requests;
+
+       if (cdt->cdt_state != CDT_STOPPED) {
+               CERROR("%s: Coordinator already started\n",
+                      mdt_obd_name(mdt));
+               RETURN(-EALREADY);
+       }
+
+       cdt->cdt_policy = CDT_DEFAULT_POLICY;
+       cdt->cdt_state = CDT_INIT;
+
+       cfs_atomic_set(&cdt->cdt_compound_id, cfs_time_current_sec());
+       /* just need to be larger than previous one */
+       /* cdt_last_cookie is protected by cdt_llog_lock */
+       cdt->cdt_last_cookie = cfs_time_current_sec();
+       cdt->cdt_loop_period = 10;
+       cdt->cdt_delay = 60;
+       cdt->cdt_timeout = 3600;
+       cdt->cdt_max_request = 3;
+       atomic_set(&cdt->cdt_request_count, 0);
+
+       /* to avoid deadlock when start is made through /proc
+        * /proc entries are created by the coordinator thread */
+
+       /* set up list of started restore requests */
+       cdt_mti = lu_context_key_get(&cdt->cdt_env.le_ctx, &mdt_thread_key);
+       rc = mdt_hsm_pending_restore(cdt_mti);
+       if (rc)
+               CERROR("%s: cannot take the layout locks needed"
+                      " for registered restore: %d",
+                      mdt_obd_name(mdt), rc);
+
+       task = kthread_run(mdt_coordinator, cdt_mti, "hsm_cdtr");
+       if (IS_ERR(task)) {
+               rc = PTR_ERR(task);
+               cdt->cdt_state = CDT_STOPPED;
+               CERROR("%s: error starting coordinator thread: %d\n",
+                      mdt_obd_name(mdt), rc);
+               RETURN(rc);
+       } else {
+               CDEBUG(D_HSM, "%s: coordinator thread started\n",
+                      mdt_obd_name(mdt));
+               rc = 0;
+       }
+
+       cfs_wait_event(cdt->cdt_thread.t_ctl_waitq,
+                      (cdt->cdt_thread.t_flags & SVC_RUNNING));
+
+       cdt->cdt_state = CDT_RUNNING;
+       mdt->mdt_opts.mo_coordinator = 1;
+       RETURN(0);
+}
+
+/**
+ * stop a coordinator thread
+ * \param mdt [IN] device
+ */
+int mdt_hsm_cdt_stop(struct mdt_device *mdt)
+{
+       struct coordinator              *cdt = &mdt->mdt_coordinator;
+       struct cdt_agent_req            *car, *tmp1;
+       struct hsm_agent                *ha, *tmp2;
+       struct cdt_restore_handle       *crh, *tmp3;
+       struct mdt_thread_info          *cdt_mti;
+       ENTRY;
+
+       if (cdt->cdt_state == CDT_STOPPED) {
+               CERROR("%s: Coordinator already stopped\n",
+                      mdt_obd_name(mdt));
+               RETURN(-EALREADY);
+       }
+
+       /* remove proc entries */
+       if (cdt->cdt_proc_dir != NULL)
+               lprocfs_remove(&cdt->cdt_proc_dir);
+
+       if (cdt->cdt_state != CDT_STOPPING) {
+               /* stop coordinator thread before cleaning */
+               cdt->cdt_thread.t_flags = SVC_STOPPING;
+               cfs_waitq_signal(&cdt->cdt_thread.t_ctl_waitq);
+               cfs_wait_event(cdt->cdt_thread.t_ctl_waitq,
+                              cdt->cdt_thread.t_flags & SVC_STOPPED);
+       }
+       cdt->cdt_state = CDT_STOPPED;
+
+       /* start cleaning */
+       down_write(&cdt->cdt_request_lock);
+       list_for_each_entry_safe(car, tmp1, &cdt->cdt_requests,
+                                car_request_list) {
+               list_del(&car->car_request_list);
+               mdt_cdt_free_request(car);
+       }
+       up_write(&cdt->cdt_request_lock);
+
+       down_write(&cdt->cdt_agent_lock);
+       list_for_each_entry_safe(ha, tmp2, &cdt->cdt_agents, ha_list) {
+               list_del(&ha->ha_list);
+               OBD_FREE_PTR(ha);
+       }
+       up_write(&cdt->cdt_agent_lock);
+
+       cdt_mti = lu_context_key_get(&cdt->cdt_env.le_ctx, &mdt_thread_key);
+       mutex_lock(&cdt->cdt_restore_lock);
+       list_for_each_entry_safe(crh, tmp3, &cdt->cdt_restore_hdl, crh_list) {
+               struct mdt_object       *child;
+
+               /* give back layout lock */
+               child = mdt_object_find(&cdt->cdt_env, mdt, &crh->crh_fid);
+               if (!IS_ERR(child))
+                       mdt_object_unlock_put(cdt_mti, child, &crh->crh_lh, 1);
+
+               list_del(&crh->crh_list);
+
+               OBD_SLAB_FREE_PTR(crh, mdt_hsm_cdt_kmem);
+       }
+       mutex_unlock(&cdt->cdt_restore_lock);
+
+       mdt->mdt_opts.mo_coordinator = 0;
+
+       RETURN(0);
+}
+
+/**
+ * register all requests from an hal in the memory list
+ * \param mti [IN] context
+ * \param hal [IN] request
+ * \param uuid [OUT] in case of CANCEL, the uuid of the agent
+ *  which is running the CT
+ * \retval 0 success
+ * \retval -ve failure
+ */
+int mdt_hsm_add_hal(struct mdt_thread_info *mti,
+                   struct hsm_action_list *hal, struct obd_uuid *uuid)
+{
+       struct mdt_device       *mdt = mti->mti_mdt;
+       struct coordinator      *cdt = &mdt->mdt_coordinator;
+       struct hsm_action_item  *hai;
+       int                      rc = 0, i;
+       ENTRY;
+
+       /* register request in memory list */
+       hai = hai_zero(hal);
+       for (i = 0; i < hal->hal_count; i++, hai = hai_next(hai)) {
+               struct cdt_agent_req *car;
+
+               /* in case of a cancel request, we first mark the ondisk
+                * record of the request we want to stop as canceled
+                * this does not change the cancel record
+                * it will be done when updating the request status
+                */
+               if (hai->hai_action == HSMA_CANCEL) {
+                       rc = mdt_agent_record_update(mti->mti_env, mti->mti_mdt,
+                                                    &hai->hai_cookie,
+                                                    1, ARS_CANCELED);
+                       if (rc) {
+                               CERROR("%s: mdt_agent_record_update() failed, "
+                                      "rc=%d, cannot update status to %s "
+                                      "for cookie "LPX64"\n",
+                                      mdt_obd_name(mdt), rc,
+                                      agent_req_status2name(ARS_CANCELED),
+                                      hai->hai_cookie);
+                               GOTO(out, rc);
+                       }
+
+                       /* find the running request to set it canceled */
+                       car = mdt_cdt_find_request(cdt, hai->hai_cookie, NULL);
+                       if (car != NULL) {
+                               car->car_canceled = 1;
+                               /* uuid has to be changed to the one running the
+                               * request to cancel */
+                               *uuid = car->car_uuid;
+                               mdt_cdt_put_request(car);
+                       }
+                       /* no need to memorize cancel request
+                        * this also avoid a deadlock when we receive
+                        * a purge all requests command
+                        */
+                       continue;
+               }
+
+               if (hai->hai_action == HSMA_ARCHIVE) {
+                       struct mdt_object *obj;
+                       struct md_hsm hsm;
+
+                       obj = mdt_hsm_get_md_hsm(mti, &hai->hai_fid, &hsm);
+                       if (IS_ERR(obj) && (PTR_ERR(obj) == -ENOENT))
+                               continue;
+                       if (IS_ERR(obj))
+                               GOTO(out, rc = PTR_ERR(obj));
+
+                       hsm.mh_flags |= HS_EXISTS;
+                       hsm.mh_arch_id = hal->hal_archive_id;
+                       rc = mdt_hsm_attr_set(mti, obj, &hsm);
+                       mdt_object_put(mti->mti_env, obj);
+                       if (rc)
+                               GOTO(out, rc);
+               }
+
+               car = mdt_cdt_alloc_request(hal->hal_compound_id,
+                                           hal->hal_archive_id, hal->hal_flags,
+                                           uuid, hai);
+               if (IS_ERR(car))
+                       GOTO(out, rc = PTR_ERR(car));
+
+               rc = mdt_cdt_add_request(cdt, car);
+               if (rc != 0)
+                       mdt_cdt_free_request(car);
+       }
+out:
+       RETURN(rc);
+}
+
+/**
+ * swap layouts between 2 fids
+ * \param mti [IN] context
+ * \param fid1 [IN]
+ * \param fid2 [IN]
+ */
+static int hsm_swap_layouts(struct mdt_thread_info *mti,
+                           const lustre_fid *fid, const lustre_fid *dfid)
+{
+       struct mdt_device       *mdt = mti->mti_mdt;
+       struct mdt_object       *child1, *child2;
+       struct mdt_lock_handle  *lh2;
+       int                      rc;
+       ENTRY;
+
+       child1 = mdt_object_find(mti->mti_env, mdt, fid);
+       if (IS_ERR(child1))
+               GOTO(out, rc = PTR_ERR(child1));
+
+       /* we already have layout lock on FID so take only
+        * on dfid */
+       lh2 = &mti->mti_lh[MDT_LH_OLD];
+       mdt_lock_reg_init(lh2, LCK_EX);
+       child2 = mdt_object_find_lock(mti, dfid, lh2, MDS_INODELOCK_LAYOUT);
+       if (IS_ERR(child2))
+               GOTO(out_child1, rc = PTR_ERR(child2));
+
+       /* if copy tool closes the volatile before sending the final
+        * progress through llapi_hsm_copy_end(), all the objects
+        * are removed and mdd_swap_layout LBUG */
+       if (mdt_object_exists(child2)) {
+               rc = mo_swap_layouts(mti->mti_env, mdt_object_child(child1),
+                                    mdt_object_child(child2), 0);
+       } else {
+               CERROR("%s: Copytool has closed volatile file "DFID"\n",
+                      mdt_obd_name(mti->mti_mdt), PFID(dfid));
+               rc = -ENOENT;
+       }
+
+       mdt_object_unlock_put(mti, child2, lh2, 1);
+out_child1:
+       mdt_object_put(mti->mti_env, child1);
+out:
+       RETURN(rc);
+}
+
+/**
+ * update status of a completed request
+ * \param mti [IN] context
+ * \param pgs [IN] progress of the copy tool
+ * \param update_record [IN] update llog record
+ * \retval 0 success
+ * \retval -ve failure
+ */
+static int hsm_cdt_request_completed(struct mdt_thread_info *mti,
+                                    struct hsm_progress_kernel *pgs,
+                                    const struct cdt_agent_req *car,
+                                    enum agent_req_status *status)
+{
+       const struct lu_env     *env = mti->mti_env;
+       struct mdt_device       *mdt = mti->mti_mdt;
+       struct coordinator      *cdt = &mdt->mdt_coordinator;
+       struct mdt_object       *obj = NULL;
+       int                      cl_flags = 0, rc = 0;
+       struct md_hsm            mh;
+       bool                     is_mh_changed;
+       ENTRY;
+
+       /* default is to retry */
+       *status = ARS_WAITING;
+
+       /* find object by FID */
+       obj = mdt_hsm_get_md_hsm(mti, &car->car_hai->hai_fid, &mh);
+       /* we will update MD HSM only if needed */
+       is_mh_changed = false;
+       if (IS_ERR(obj)) {
+               /* object removed */
+               *status = ARS_SUCCEED;
+               goto unlock;
+       }
+
+       /* no need to change mh->mh_arch_id
+        * mdt_hsm_get_md_hsm() got it from disk and it is still valid
+        */
+       if (pgs->hpk_errval != 0) {
+               switch (pgs->hpk_errval) {
+               case ENOSYS:
+                       /* the copy tool does not support cancel
+                        * so the cancel request is failed
+                        * As we cannot distinguish a cancel progress
+                        * from another action progress (they have the
+                        * same cookie), we suppose here the CT returns
+                        * ENOSYS only if does not support cancel
+                        */
+                       /* this can also happen when cdt calls it to
+                        * for a timeouted request */
+                       *status = ARS_FAILED;
+                       /* to have a cancel event in changelog */
+                       pgs->hpk_errval = ECANCELED;
+                       break;
+               case ECANCELED:
+                       /* the request record has already been set to
+                        * ARS_CANCELED, this set the cancel request
+                        * to ARS_SUCCEED */
+                       *status = ARS_SUCCEED;
+                       break;
+               default:
+                       *status = (((cdt->cdt_policy &
+                                  CDT_NORETRY_ACTION) ||
+                                  !(pgs->hpk_flags & HP_FLAG_RETRY)) ?
+                                  ARS_FAILED : ARS_WAITING);
+                       break;
+               }
+
+               if (pgs->hpk_errval > CLF_HSM_MAXERROR) {
+                       CERROR("%s: Request "LPX64" on "DFID
+                              " failed, error code %d too large\n",
+                              mdt_obd_name(mdt),
+                              pgs->hpk_cookie, PFID(&pgs->hpk_fid),
+                              pgs->hpk_errval);
+                       hsm_set_cl_error(&cl_flags,
+                                        CLF_HSM_ERROVERFLOW);
+                       rc = -EINVAL;
+               } else {
+                       hsm_set_cl_error(&cl_flags, pgs->hpk_errval);
+               }
+
+               switch (car->car_hai->hai_action) {
+               case HSMA_ARCHIVE:
+                       hsm_set_cl_event(&cl_flags, HE_ARCHIVE);
+                       break;
+               case HSMA_RESTORE:
+                       hsm_set_cl_event(&cl_flags, HE_RESTORE);
+                       break;
+               case HSMA_REMOVE:
+                       hsm_set_cl_event(&cl_flags, HE_REMOVE);
+                       break;
+               case HSMA_CANCEL:
+                       hsm_set_cl_event(&cl_flags, HE_CANCEL);
+                       CERROR("%s: Failed request "LPX64" on "DFID
+                              " cannot be a CANCEL\n",
+                              mdt_obd_name(mdt),
+                              pgs->hpk_cookie,
+                              PFID(&pgs->hpk_fid));
+                       break;
+               default:
+                       CERROR("%s: Failed request "LPX64" on "DFID
+                              " %d is an unknown action\n",
+                              mdt_obd_name(mdt),
+                              pgs->hpk_cookie, PFID(&pgs->hpk_fid),
+                              car->car_hai->hai_action);
+                       rc = -EINVAL;
+                       break;
+               }
+       } else {
+               *status = ARS_SUCCEED;
+               switch (car->car_hai->hai_action) {
+               case HSMA_ARCHIVE:
+                       hsm_set_cl_event(&cl_flags, HE_ARCHIVE);
+                       /* set ARCHIVE keep EXIST and clear LOST and
+                        * DIRTY */
+                       mh.mh_arch_ver = pgs->hpk_data_version;
+                       mh.mh_flags |= HS_ARCHIVED;
+                       mh.mh_flags &= ~(HS_LOST|HS_DIRTY);
+                       is_mh_changed = true;
+                       break;
+               case HSMA_RESTORE:
+                       hsm_set_cl_event(&cl_flags, HE_RESTORE);
+
+                       /* clear RELEASED and DIRTY */
+                       mh.mh_flags &= ~(HS_RELEASED | HS_DIRTY);
+                       /* Restoring has changed the file version on
+                        * disk. */
+                       mh.mh_arch_ver = pgs->hpk_data_version;
+                       is_mh_changed = true;
+                       break;
+               case HSMA_REMOVE:
+                       hsm_set_cl_event(&cl_flags, HE_REMOVE);
+                       /* clear ARCHIVED EXISTS and LOST */
+                       mh.mh_flags &= ~(HS_ARCHIVED | HS_EXISTS | HS_LOST);
+                       is_mh_changed = true;
+                       break;
+               case HSMA_CANCEL:
+                       hsm_set_cl_event(&cl_flags, HE_CANCEL);
+                       CERROR("%s: Successful request "LPX64
+                              " on "DFID
+                              " cannot be a CANCEL\n",
+                              mdt_obd_name(mdt),
+                              pgs->hpk_cookie,
+                              PFID(&pgs->hpk_fid));
+                       break;
+               default:
+                       CERROR("%s: Successful request "LPX64
+                              " on "DFID
+                              " %d is an unknown action\n",
+                              mdt_obd_name(mdt),
+                              pgs->hpk_cookie, PFID(&pgs->hpk_fid),
+                              car->car_hai->hai_action);
+                       rc = -EINVAL;
+                       break;
+               }
+       }
+
+       /* rc != 0 means error when analysing action, it may come from
+        * a crasy CT no need to manage DIRTY
+        */
+       if (rc == 0)
+               hsm_set_cl_flags(&cl_flags, ((mh.mh_flags & HS_DIRTY) ?
+                                            CLF_HSM_DIRTY : 0));
+
+       /* unlock is done later, after layout lock management */
+       if (is_mh_changed)
+               rc = mdt_hsm_attr_set(mti, obj, &mh);
+
+unlock:
+       /* we give back layout lock only if restore was successful or
+        * if restore was canceled or if policy is to not retry
+        * in other cases we just unlock the object */
+       if ((car->car_hai->hai_action == HSMA_RESTORE) &&
+           ((pgs->hpk_errval == 0) || (pgs->hpk_errval == ECANCELED) ||
+            (cdt->cdt_policy & CDT_NORETRY_ACTION))) {
+               struct cdt_restore_handle       *crh;
+
+               /* restore in data FID done, we swap the layouts
+                * only if restore is successfull */
+               if (pgs->hpk_errval == 0) {
+                       rc = hsm_swap_layouts(mti, &car->car_hai->hai_fid,
+                                             &car->car_hai->hai_dfid);
+                       if (rc) {
+                               if (cdt->cdt_policy & CDT_NORETRY_ACTION)
+                                       *status = ARS_FAILED;
+                               pgs->hpk_errval = -rc;
+                       }
+               }
+               /* we have to retry, so keep layout lock */
+               if (*status == ARS_WAITING)
+                       GOTO(out, rc);
+
+               /* give back layout lock */
+               mutex_lock(&cdt->cdt_restore_lock);
+               crh = hsm_restore_hdl_find(cdt, &car->car_hai->hai_fid);
+               if (crh != NULL)
+                       list_del(&crh->crh_list);
+               mutex_unlock(&cdt->cdt_restore_lock);
+               /* just give back layout lock, we keep
+                * the reference which is given back
+                * later with the lock for HSM flags */
+               if (!IS_ERR(obj))
+                       mdt_object_unlock(mti, obj, &crh->crh_lh, 1);
+               if (crh != NULL)
+                       OBD_SLAB_FREE_PTR(crh, mdt_hsm_cdt_kmem);
+       }
+
+       GOTO(out, rc);
+
+out:
+       if ((obj != NULL) && !IS_ERR(obj)) {
+               mo_changelog(env, CL_HSM, cl_flags,
+                            mdt_object_child(obj));
+               mdt_object_put(mti->mti_env, obj);
+       }
+
+       RETURN(rc);
+}
+
+/**
+ * update status of a request
+ * \param mti [IN] context
+ * \param pgs [IN] progress of the copy tool
+ * \param update_record [IN] update llog record
+ * \retval 0 success
+ * \retval -ve failure
+ */
+int mdt_hsm_update_request_state(struct mdt_thread_info *mti,
+                                struct hsm_progress_kernel *pgs,
+                                const int update_record)
+{
+       struct mdt_device       *mdt = mti->mti_mdt;
+       struct coordinator      *cdt = &mdt->mdt_coordinator;
+       struct cdt_agent_req    *car;
+       int                      rc = 0;
+       ENTRY;
+
+       /* no coordinator started, so we cannot serve requests */
+       if (cdt->cdt_state == CDT_STOPPED)
+               RETURN(-EAGAIN);
+
+       /* first do sanity checks */
+       car = mdt_cdt_update_request(cdt, pgs);
+       if (IS_ERR(car)) {
+               CERROR("%s: Cannot find running request for cookie "LPX64
+                      " on fid="DFID"\n",
+                      mdt_obd_name(mdt),
+                      pgs->hpk_cookie, PFID(&pgs->hpk_fid));
+               RETURN(PTR_ERR(car));
+       }
+
+       CDEBUG(D_HSM, "Progress received for fid="DFID" cookie="LPX64
+                     " action=%s flags=%d err=%d fid="DFID" dfid="DFID"\n",
+                     PFID(&pgs->hpk_fid), pgs->hpk_cookie,
+                     hsm_copytool_action2name(car->car_hai->hai_action),
+                     pgs->hpk_flags, pgs->hpk_errval,
+                     PFID(&car->car_hai->hai_fid),
+                     PFID(&car->car_hai->hai_dfid));
+
+       /* progress is done on FID or data FID depending of the action and
+        * of the copy progress */
+       /* for restore progress is used to send back the data FID to cdt */
+       if ((car->car_hai->hai_action == HSMA_RESTORE) &&
+           (lu_fid_eq(&car->car_hai->hai_fid, &car->car_hai->hai_dfid)))
+               car->car_hai->hai_dfid = pgs->hpk_fid;
+
+       if (((car->car_hai->hai_action == HSMA_RESTORE) ||
+            (car->car_hai->hai_action == HSMA_ARCHIVE)) &&
+           (!lu_fid_eq(&pgs->hpk_fid, &car->car_hai->hai_dfid) &&
+            !lu_fid_eq(&pgs->hpk_fid, &car->car_hai->hai_fid))) {
+               CERROR("%s: Progress on "DFID" for cookie "LPX64
+                      " does not match request FID "DFID" nor data FID "
+                      DFID"\n",
+                      mdt_obd_name(mdt),
+                      PFID(&pgs->hpk_fid), pgs->hpk_cookie,
+                      PFID(&car->car_hai->hai_fid),
+                      PFID(&car->car_hai->hai_dfid));
+               GOTO(out, rc = -EINVAL);
+       }
+
+       if (pgs->hpk_errval != 0 && !(pgs->hpk_flags & HP_FLAG_COMPLETED)) {
+               CERROR("%s: Progress on "DFID" for cookie "LPX64" action=%s"
+                      " is not coherent (err=%d and not completed"
+                      " (flags=%d))\n",
+                      mdt_obd_name(mdt),
+                      PFID(&pgs->hpk_fid), pgs->hpk_cookie,
+                      hsm_copytool_action2name(car->car_hai->hai_action),
+                      pgs->hpk_errval, pgs->hpk_flags);
+               GOTO(out, rc = -EINVAL);
+       }
+
+       /* now progress is valid */
+
+       /* we use a root like ucred */
+       hsm_init_ucred(mdt_ucred(mti));
+
+       if (pgs->hpk_flags & HP_FLAG_COMPLETED) {
+               enum agent_req_status    status;
+
+               rc = hsm_cdt_request_completed(mti, pgs, car, &status);
+
+               /* remove request from memory list */
+               mdt_cdt_remove_request(cdt, pgs->hpk_cookie);
+
+               CDEBUG(D_HSM, "Updating record: fid="DFID" cookie="LPX64
+                             " action=%s status=%s\n", PFID(&pgs->hpk_fid),
+                      pgs->hpk_cookie,
+                      hsm_copytool_action2name(car->car_hai->hai_action),
+                      agent_req_status2name(status));
+
+               if (update_record) {
+                       int rc1;
+
+                       rc1 = mdt_agent_record_update(mti->mti_env, mdt,
+                                                    &pgs->hpk_cookie, 1,
+                                                    status);
+                       if (rc1)
+                               CERROR("%s: mdt_agent_record_update() failed,"
+                                      " rc=%d, cannot update status to %s"
+                                      " for cookie "LPX64"\n",
+                                      mdt_obd_name(mdt), rc1,
+                                      agent_req_status2name(status),
+                                      pgs->hpk_cookie);
+                       rc = (rc != 0 ? rc : rc1);
+               }
+               /* ct has completed a request, so a slot is available, wakeup
+                * cdt to find new work */
+               mdt_hsm_cdt_wakeup(mdt);
+       } else {
+               /* if copytool send a progress on a canceled request
+                * we inform copytool it should stop
+                */
+               if (car->car_canceled == 1)
+                       rc = -ECANCELED;
+       }
+       GOTO(out, rc);
+
+out:
+       /* remove ref got from mdt_cdt_update_request() */
+       mdt_cdt_put_request(car);
+
+       return rc;
+}
+
+
+/**
+ * data passed to llog_cat_process() callback
+ * to cancel requests
+ */
+struct hsm_cancel_all_data {
+       struct mdt_device       *mdt;
+};
+
+/**
+ *  llog_cat_process() callback, used to:
+ *  - purge all requests
+ * \param env [IN] environment
+ * \param llh [IN] llog handle
+ * \param hdr [IN] llog record
+ * \param data [IN] cb data = struct hsm_cancel_all_data
+ * \retval 0 success
+ * \retval -ve failure
+ */
+static int mdt_cancel_all_cb(const struct lu_env *env,
+                            struct llog_handle *llh,
+                            struct llog_rec_hdr *hdr, void *data)
+{
+       struct llog_agent_req_rec       *larr;
+       struct hsm_cancel_all_data      *hcad;
+       int                              rc = 0;
+       ENTRY;
+
+       larr = (struct llog_agent_req_rec *)hdr;
+       hcad = data;
+       if ((larr->arr_status == ARS_WAITING) ||
+           (larr->arr_status == ARS_STARTED)) {
+               larr->arr_status = ARS_CANCELED;
+               larr->arr_req_change = cfs_time_current_sec();
+               rc = mdt_agent_llog_update_rec(env, hcad->mdt, llh, larr);
+               if (rc == 0)
+                       RETURN(LLOG_DEL_RECORD);
+       }
+       RETURN(rc);
+}
+
+/**
+ * cancel all actions
+ * \param obd [IN] MDT device
+ */
+static int hsm_cancel_all_actions(struct mdt_device *mdt)
+{
+       struct mdt_thread_info          *mti;
+       struct coordinator              *cdt = &mdt->mdt_coordinator;
+       struct cdt_agent_req            *car;
+       struct hsm_action_list          *hal = NULL;
+       struct hsm_action_item          *hai;
+       struct hsm_cancel_all_data       hcad;
+       int                              hal_sz = 0, hal_len, rc;
+       enum cdt_states                  save_state;
+       ENTRY;
+
+       /* retrieve coordinator context */
+       mti = lu_context_key_get(&cdt->cdt_env.le_ctx, &mdt_thread_key);
+
+       /* disable coordinator */
+       save_state = cdt->cdt_state;
+       cdt->cdt_state = CDT_DISABLE;
+
+       /* send cancel to all running requests */
+       down_read(&cdt->cdt_request_lock);
+       list_for_each_entry(car, &cdt->cdt_requests, car_request_list) {
+               mdt_cdt_get_request(car);
+               /* request is not yet removed from list, it will be done
+                * when copytool will return progress
+                */
+
+               if (car->car_hai->hai_action == HSMA_CANCEL) {
+                       mdt_cdt_put_request(car);
+                       continue;
+               }
+
+               /* needed size */
+               hal_len = sizeof(*hal) + cfs_size_round(MTI_NAME_MAXLEN + 1) +
+                         cfs_size_round(car->car_hai->hai_len);
+
+               if ((hal_len > hal_sz) && (hal_sz > 0)) {
+                       /* not enough room, free old buffer */
+                       OBD_FREE(hal, hal_sz);
+                       hal = NULL;
+               }
+
+               /* empty buffer, allocate one */
+               if (hal == NULL) {
+                       hal_sz = hal_len;
+                       OBD_ALLOC(hal, hal_sz);
+                       if (hal == NULL) {
+                               mdt_cdt_put_request(car);
+                               up_read(&cdt->cdt_request_lock);
+                               GOTO(out, rc = -ENOMEM);
+                       }
+               }
+
+               hal->hal_version = HAL_VERSION;
+               obd_uuid2fsname(hal->hal_fsname, mdt_obd_name(mdt),
+                               MTI_NAME_MAXLEN);
+               hal->hal_fsname[MTI_NAME_MAXLEN] = '\0';
+               hal->hal_compound_id = car->car_compound_id;
+               hal->hal_archive_id = car->car_archive_id;
+               hal->hal_flags = car->car_flags;
+               hal->hal_count = 0;
+
+               hai = hai_zero(hal);
+               memcpy(hai, car->car_hai, car->car_hai->hai_len);
+               hai->hai_action = HSMA_CANCEL;
+               hal->hal_count = 1;
+
+               /* it is possible to safely call mdt_hsm_agent_send()
+                * (ie without a deadlock on cdt_request_lock), because the
+                * write lock is taken only if we are not in purge mode
+                * (mdt_hsm_agent_send() does not call mdt_cdt_add_request()
+                *   nor mdt_cdt_remove_request())
+                */
+               /* no conflict with cdt thread because cdt is disable and we
+                * have the request lock */
+               mdt_hsm_agent_send(mti, hal, 1);
+
+               mdt_cdt_put_request(car);
+       }
+       up_read(&cdt->cdt_request_lock);
+
+       if (hal != NULL)
+               OBD_FREE(hal, hal_sz);
+
+       /* cancel all on-disk records */
+       hcad.mdt = mdt;
+
+       rc = cdt_llog_process(mti->mti_env, mti->mti_mdt,
+                             mdt_cancel_all_cb, &hcad);
+out:
+       /* enable coordinator */
+       cdt->cdt_state = save_state;
+
+       RETURN(rc);
+}
+
+/**
+ * check if a request is comptaible with file status
+ * \param hai [IN] request description
+ * \param hal_an [IN] request archive number (not used)
+ * \param rq_flags [IN] request flags
+ * \param hsm [IN] file HSM metadata
+ * \retval boolean
+ */
+bool mdt_hsm_is_action_compat(const struct hsm_action_item *hai,
+                             const int hal_an, const __u64 rq_flags,
+                             const struct md_hsm *hsm)
+{
+       int      is_compat = false;
+       int      hsm_flags;
+       ENTRY;
+
+       hsm_flags = hsm->mh_flags;
+       switch (hai->hai_action) {
+       case HSMA_ARCHIVE:
+               if (!(hsm_flags & HS_NOARCHIVE) &&
+                   ((hsm_flags & HS_DIRTY) || !(hsm_flags & HS_ARCHIVED)))
+                       is_compat = true;
+               break;
+       case HSMA_RESTORE:
+               if (!(hsm_flags & HS_DIRTY) && (hsm_flags & HS_RELEASED) &&
+                   (hsm_flags & HS_ARCHIVED) && !(hsm_flags & HS_LOST))
+                       is_compat = true;
+               break;
+       case HSMA_REMOVE:
+               if (!(hsm_flags & HS_RELEASED) &&
+                   (hsm_flags & (HS_ARCHIVED | HS_EXISTS)))
+                       is_compat = true;
+               break;
+       case HSMA_CANCEL:
+               is_compat = true;
+               break;
+       }
+       CDEBUG(D_HSM, "fid="DFID" action=%s flags="LPX64
+                     " extent="LPX64"-"LPX64" hsm_flags=%.8X %s\n",
+                     PFID(&hai->hai_fid),
+                     hsm_copytool_action2name(hai->hai_action), rq_flags,
+                     hai->hai_extent.offset, hai->hai_extent.length,
+                     hsm->mh_flags,
+                     (is_compat ? "compatible" : "uncompatible"));
+
+       RETURN(is_compat);
+}
+
+/*
+ * /proc interface used to get/set HSM behaviour (cdt->cdt_policy)
+ */
+static const struct {
+       __u64            bit;
+       char            *name;
+       char            *nickname;
+} hsm_policy_names[] = {
+       { CDT_NONBLOCKING_RESTORE,      "non_blocking_restore", "nbr"},
+       { CDT_NORETRY_ACTION,           "no_retry_action",      "nra"},
+       { 0 },
+};
+
+/**
+ * convert a policy name to a bit
+ * \param name [IN] policy name
+ * \retval 0 unknown
+ * \retval   policy bit
+ */
+static __u64 hsm_policy_str2bit(const char *name)
+{
+       int      i;
+
+       for (i = 0; hsm_policy_names[i].bit != 0; i++)
+               if (strcmp(hsm_policy_names[i].nickname, name) == 0)
+                       return hsm_policy_names[i].bit;
+       return 0;
+}
+
+/**
+ * convert a policy bit field to a string
+ * \param mask [IN] policy bit field
+ * \param buffer [OUT] string
+ * \param count [IN] size of buffer
+ * \retval size filled in buffer
+ */
+static int hsm_policy_bit2str(const __u64 mask, char *buffer, int count)
+{
+       int      i, j, sz;
+       char    *ptr;
+       __u64    bit;
+       ENTRY;
+
+       ptr = buffer;
+       sz = snprintf(buffer, count, "("LPX64") ", mask);
+       ptr += sz;
+       count -= sz;
+       for (i = 0; i < (sizeof(mask) * 8); i++) {
+               bit = (1ULL << i);
+               if (!(bit  & mask))
+                       continue;
+
+               for (j = 0; hsm_policy_names[j].bit != 0; j++) {
+                       if (hsm_policy_names[j].bit == bit) {
+                               sz = snprintf(ptr, count, "%s(%s) ",
+                                             hsm_policy_names[j].name,
+                                             hsm_policy_names[j].nickname);
+                               ptr += sz;
+                               count -= sz;
+                               break;
+                       }
+               }
+       }
+       RETURN(ptr - buffer);
+}
+
+/* methods to read/write HSM policy flags */
+static int lprocfs_rd_hsm_policy(char *page, char **start, off_t off,
+                                int count, int *eof, void *data)
+{
+       struct mdt_device       *mdt = data;
+       struct coordinator      *cdt = &mdt->mdt_coordinator;
+       int                      sz;
+       ENTRY;
+
+       sz = hsm_policy_bit2str(cdt->cdt_policy, page, count);
+       page[sz] = '\n';
+       sz++;
+       page[sz] = '\0';
+       *eof = 1;
+       RETURN(sz);
+}
+
+static int lprocfs_wr_hsm_policy(struct file *file, const char *buffer,
+                                unsigned long count, void *data)
+{
+       struct mdt_device       *mdt = data;
+       struct coordinator      *cdt = &mdt->mdt_coordinator;
+       int                      sz;
+       char                    *start, *end;
+       __u64                    policy;
+       int                      set;
+       char                    *buf;
+       ENTRY;
+
+       if (strncmp(buffer, "help", 4) == 0) {
+               sz = PAGE_SIZE;
+               OBD_ALLOC(buf, sz);
+               if (!buf)
+                       RETURN(-ENOMEM);
+
+               hsm_policy_bit2str(CDT_POLICY_MASK, buf, sz);
+               CWARN("Supported policies are: %s\n", buf);
+               OBD_FREE(buf, sz);
+               RETURN(count);
+       }
+
+       OBD_ALLOC(buf, count + 1);
+       if (buf == NULL)
+               RETURN(-ENOMEM);
+
+       if (copy_from_user(buf, buffer, count))
+               RETURN(-EFAULT);
+
+       buf[count] = '\0';
+       start = buf;
+
+       policy = 0;
+       do {
+               end = strchr(start, ' ');
+               if (end != NULL)
+                       *end = '\0';
+               switch (*start) {
+               case '-':
+                       start++;
+                       set = 0;
+                       break;
+               case '+':
+                       start++;
+                       set = 1;
+                       break;
+               default:
+                       set = 2;
+                       break;
+               }
+               policy = hsm_policy_str2bit(start);
+               if (!policy)
+                       break;
+
+               switch (set) {
+               case 0:
+                       cdt->cdt_policy &= ~policy;
+                       break;
+               case 1:
+                       cdt->cdt_policy |= policy;
+                       break;
+               case 2:
+                       cdt->cdt_policy = policy;
+                       break;
+               }
+
+               start = end + 1;
+       } while (end != NULL);
+       OBD_FREE(buf, count + 1);
+       RETURN(count);
+}
+
+#define GENERATE_PROC_METHOD(VAR)                                      \
+static int lprocfs_rd_hsm_##VAR(char *page, char **start, off_t off,   \
+                               int count, int *eof, void *data)        \
+{                                                                      \
+       struct mdt_device       *mdt = data;                            \
+       struct coordinator      *cdt = &mdt->mdt_coordinator;           \
+       int                      sz;                                    \
+       ENTRY;                                                          \
+                                                                       \
+       sz = snprintf(page, count, LPU64"\n", (__u64)cdt->VAR);         \
+       *eof = 1;                                                       \
+       RETURN(sz);                                                     \
+}                                                                      \
+static int lprocfs_wr_hsm_##VAR(struct file *file, const char *buffer, \
+                               unsigned long count, void *data)        \
+                                                                       \
+{                                                                      \
+       struct mdt_device       *mdt = data;                            \
+       struct coordinator      *cdt = &mdt->mdt_coordinator;           \
+       int                      val;                                   \
+       int                      rc;                                    \
+       ENTRY;                                                          \
+                                                                       \
+       rc = lprocfs_write_helper(buffer, count, &val);                 \
+       if (rc)                                                         \
+               RETURN(rc);                                             \
+       if (val > 0) {                                                  \
+               cdt->VAR = val;                                         \
+               RETURN(count);                                          \
+       }                                                               \
+       RETURN(-EINVAL);                                                \
+}
+
+GENERATE_PROC_METHOD(cdt_loop_period)
+GENERATE_PROC_METHOD(cdt_delay)
+GENERATE_PROC_METHOD(cdt_timeout)
+GENERATE_PROC_METHOD(cdt_max_request)
+
+/*
+ * procfs write method for MDT/hsm_control
+ * proc entry is in mdt directory so data is mdt obd_device pointer
+ */
+#define CDT_ENABLE_CMD   "enabled"
+#define CDT_STOP_CMD     "shutdown"
+#define CDT_DISABLE_CMD  "disabled"
+#define CDT_PURGE_CMD    "purge"
+#define CDT_HELP_CMD     "help"
+
+int lprocfs_wr_hsm_cdt_control(struct file *file, const char *buffer,
+                              unsigned long count, void *data)
+{
+       struct obd_device       *obd = data;
+       struct mdt_device       *mdt = mdt_dev(obd->obd_lu_dev);
+       struct coordinator      *cdt = &(mdt->mdt_coordinator);
+       int                      rc, usage = 0;
+       ENTRY;
+
+       rc = 0;
+       if (strncmp(buffer, CDT_ENABLE_CMD, strlen(CDT_ENABLE_CMD)) == 0) {
+               if (cdt->cdt_state == CDT_DISABLE) {
+                       cdt->cdt_state = CDT_RUNNING;
+                       mdt_hsm_cdt_wakeup(mdt);
+               } else {
+                       rc = mdt_hsm_cdt_start(mdt);
+               }
+       } else if (strncmp(buffer, CDT_STOP_CMD, strlen(CDT_STOP_CMD)) == 0) {
+               cdt->cdt_state = CDT_STOPPING;
+       } else if (strncmp(buffer, CDT_DISABLE_CMD,
+                          strlen(CDT_DISABLE_CMD)) == 0) {
+               cdt->cdt_state = CDT_DISABLE;
+       } else if (strncmp(buffer, CDT_PURGE_CMD, strlen(CDT_PURGE_CMD)) == 0) {
+               rc = hsm_cancel_all_actions(mdt);
+       } else if (strncmp(buffer, CDT_HELP_CMD, strlen(CDT_HELP_CMD)) == 0) {
+               usage = 1;
+       } else {
+               usage = 1;
+               rc = -EINVAL;
+       }
+
+       if (usage == 1)
+               CERROR("%s: Valid coordinator control commands are: "
+                      "%s %s %s %s %s\n", mdt_obd_name(mdt),
+                      CDT_ENABLE_CMD, CDT_STOP_CMD, CDT_DISABLE_CMD,
+                      CDT_PURGE_CMD, CDT_HELP_CMD);
+
+       if (rc)
+               RETURN(rc);
+
+       RETURN(count);
+}
+
+int lprocfs_rd_hsm_cdt_control(char *page, char **start, off_t off,
+                              int count, int *eof, void *data)
+{
+       struct obd_device       *obd = data;
+       struct coordinator      *cdt;
+       int                      sz;
+       ENTRY;
+
+       cdt = &(mdt_dev(obd->obd_lu_dev)->mdt_coordinator);
+       *eof = 1;
+
+       if (cdt->cdt_state == CDT_INIT)
+               sz = snprintf(page, count, "init\n");
+       else if (cdt->cdt_state == CDT_RUNNING)
+               sz = snprintf(page, count, "enabled\n");
+       else if (cdt->cdt_state == CDT_STOPPING)
+               sz = snprintf(page, count, "stopping\n");
+       else if (cdt->cdt_state == CDT_STOPPED)
+               sz = snprintf(page, count, "stopped\n");
+       else if (cdt->cdt_state == CDT_DISABLE)
+               sz = snprintf(page, count, "disabled\n");
+       else
+               sz = snprintf(page, count, "unknown\n");
+
+       RETURN(sz);
+}
+
+static struct lprocfs_vars lprocfs_mdt_hsm_vars[] = {
+       { "agents",             NULL, NULL, NULL, &mdt_hsm_agent_fops, 0 },
+       { "agent_actions",      NULL, NULL, NULL,
+                               &mdt_agent_actions_fops, 0444 },
+       { "grace_delay",        lprocfs_rd_hsm_cdt_delay,
+                               lprocfs_wr_hsm_cdt_delay,
+                               NULL, NULL, 0 },
+       { "loop_period",        lprocfs_rd_hsm_cdt_loop_period,
+                               lprocfs_wr_hsm_cdt_loop_period,
+                               NULL, NULL, 0 },
+       { "max_requests",       lprocfs_rd_hsm_cdt_max_request,
+                               lprocfs_wr_hsm_cdt_max_request,
+                               NULL, NULL, 0 },
+       { "policy",             lprocfs_rd_hsm_policy, lprocfs_wr_hsm_policy,
+                               NULL, NULL, 0 },
+       { "request_timeout",    lprocfs_rd_hsm_cdt_timeout,
+                               lprocfs_wr_hsm_cdt_timeout,
+                               NULL, NULL, 0 },
+       { "requests",           NULL, NULL, NULL, &mdt_hsm_request_fops, 0 },
+       { 0 }
+};
index d997413..9ec0404 100644 (file)
@@ -100,6 +100,12 @@ static const struct lu_object_operations mdt_obj_ops;
 /* Slab for MDT object allocation */
 static struct kmem_cache *mdt_object_kmem;
 
 /* Slab for MDT object allocation */
 static struct kmem_cache *mdt_object_kmem;
 
+/* For HSM restore handles */
+struct kmem_cache *mdt_hsm_cdt_kmem;
+
+/* For HSM request handles */
+struct kmem_cache *mdt_hsm_car_kmem;
+
 static struct lu_kmem_descr mdt_caches[] = {
        {
                .ckd_cache = &mdt_object_kmem,
 static struct lu_kmem_descr mdt_caches[] = {
        {
                .ckd_cache = &mdt_object_kmem,
@@ -107,6 +113,16 @@ static struct lu_kmem_descr mdt_caches[] = {
                .ckd_size  = sizeof(struct mdt_object)
        },
        {
                .ckd_size  = sizeof(struct mdt_object)
        },
        {
+               .ckd_cache      = &mdt_hsm_cdt_kmem,
+               .ckd_name       = "mdt_cdt_restore_handle",
+               .ckd_size       = sizeof(struct cdt_restore_handle)
+       },
+       {
+               .ckd_cache      = &mdt_hsm_car_kmem,
+               .ckd_name       = "mdt_cdt_agent_req",
+               .ckd_size       = sizeof(struct cdt_agent_req)
+       },
+       {
                .ckd_cache = NULL
        }
 };
                .ckd_cache = NULL
        }
 };
@@ -4957,13 +4973,15 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
         cfs_timer_init(&m->mdt_ck_timer, mdt_ck_timer_callback, m);
 
        rc = mdt_hsm_cdt_init(m);
         cfs_timer_init(&m->mdt_ck_timer, mdt_ck_timer_callback, m);
 
        rc = mdt_hsm_cdt_init(m);
-       if (rc != 0)
-               CERROR("%s: Cannot init coordinator, rc %d\n",
+       if (rc != 0) {
+               CERROR("%s: error initializing coordinator, rc %d\n",
                       mdt_obd_name(m), rc);
                       mdt_obd_name(m), rc);
+                GOTO(err_free_ns, rc);
+       }
 
         rc = mdt_ck_thread_start(m);
         if (rc)
 
         rc = mdt_ck_thread_start(m);
         if (rc)
-                GOTO(err_free_ns, rc);
+                GOTO(err_free_hsm, rc);
 
        rc = tgt_init(env, &m->mdt_lut, obd, m->mdt_bottom, mdt_common_slice,
                      OBD_FAIL_MDS_ALL_REQUEST_NET,
 
        rc = tgt_init(env, &m->mdt_lut, obd, m->mdt_bottom, mdt_common_slice,
                      OBD_FAIL_MDS_ALL_REQUEST_NET,
@@ -5053,6 +5071,8 @@ err_tgt:
 err_capa:
        cfs_timer_disarm(&m->mdt_ck_timer);
        mdt_ck_thread_stop(m);
 err_capa:
        cfs_timer_disarm(&m->mdt_ck_timer);
        mdt_ck_thread_stop(m);
+err_free_hsm:
+       mdt_hsm_cdt_fini(m);
 err_free_ns:
        ldlm_namespace_free(m->mdt_namespace, NULL, 0);
        obd->obd_namespace = m->mdt_namespace = NULL;
 err_free_ns:
        ldlm_namespace_free(m->mdt_namespace, NULL, 0);
        obd->obd_namespace = m->mdt_namespace = NULL;
index bbebf98..46511fe 100644 (file)
@@ -55,7 +55,7 @@
  * Update on-disk HSM attributes.
  */
 int mdt_hsm_attr_set(struct mdt_thread_info *info, struct mdt_object *obj,
  * Update on-disk HSM attributes.
  */
 int mdt_hsm_attr_set(struct mdt_thread_info *info, struct mdt_object *obj,
-                    struct md_hsm *mh)
+                    const struct md_hsm *mh)
 {
        struct md_object        *next = mdt_object_child(obj);
        struct lu_buf           *buf = &info->mti_buf;
 {
        struct md_object        *next = mdt_object_child(obj);
        struct lu_buf           *buf = &info->mti_buf;
@@ -505,9 +505,6 @@ int mdt_hsm_request(struct mdt_thread_info *info)
        }
 
        rc = mdt_hsm_add_actions(info, hal, &compound_id);
        }
 
        rc = mdt_hsm_add_actions(info, hal, &compound_id);
-       /* ENODATA error code is needed only for implicit requests */
-       if (rc == -ENODATA)
-               rc = 0;
 
        MDT_HSM_FREE(hal, hal_size);
 
 
        MDT_HSM_FREE(hal, hal_size);
 
index 5ee61bc..8a08005 100644 (file)
@@ -43,7 +43,8 @@
 #include <lustre_log.h>
 #include "mdt_internal.h"
 
 #include <lustre_log.h>
 #include "mdt_internal.h"
 
-void dump_llog_agent_req_rec(char *prefix, struct llog_agent_req_rec *larr)
+void dump_llog_agent_req_rec(const char *prefix,
+                            const struct llog_agent_req_rec *larr)
 {
        char    buf[12];
        int     sz;
 {
        char    buf[12];
        int     sz;
@@ -95,7 +96,7 @@ int cdt_llog_process(const struct lu_env *env, struct mdt_device *mdt,
        if ((lctxt == NULL) || (lctxt->loc_handle == NULL))
                RETURN(-ENOENT);
 
        if ((lctxt == NULL) || (lctxt->loc_handle == NULL))
                RETURN(-ENOENT);
 
-       down(&cdt->cdt_llog_lock);
+       mutex_lock(&cdt->cdt_llog_lock);
 
        rc = llog_cat_process(env, lctxt->loc_handle, cb, data, 0, 0);
        if (rc < 0)
 
        rc = llog_cat_process(env, lctxt->loc_handle, cb, data, 0, 0);
        if (rc < 0)
@@ -105,7 +106,7 @@ int cdt_llog_process(const struct lu_env *env, struct mdt_device *mdt,
                rc = 0;
 
        llog_ctxt_put(lctxt);
                rc = 0;
 
        llog_ctxt_put(lctxt);
-       up(&cdt->cdt_llog_lock);
+       mutex_unlock(&cdt->cdt_llog_lock);
        RETURN(rc);
 }
 
        RETURN(rc);
 }
 
@@ -150,7 +151,7 @@ int mdt_agent_record_add(const struct lu_env *env,
        if ((lctxt == NULL) || (lctxt->loc_handle == NULL))
                GOTO(free, rc = -ENOENT);
 
        if ((lctxt == NULL) || (lctxt->loc_handle == NULL))
                GOTO(free, rc = -ENOENT);
 
-       down(&cdt->cdt_llog_lock);
+       mutex_lock(&cdt->cdt_llog_lock);
 
        /* in case of cancel request, the cookie is already set to the
         * value of the request cookie to be cancelled
 
        /* in case of cancel request, the cookie is already set to the
         * value of the request cookie to be cancelled
@@ -164,7 +165,7 @@ int mdt_agent_record_add(const struct lu_env *env,
        if (rc > 0)
                rc = 0;
 
        if (rc > 0)
                rc = 0;
 
-       up(&cdt->cdt_llog_lock);
+       mutex_unlock(&cdt->cdt_llog_lock);
        llog_ctxt_put(lctxt);
 
        EXIT;
        llog_ctxt_put(lctxt);
 
        EXIT;
index f5c60eb..b7c4c93 100644 (file)
@@ -367,30 +367,35 @@ int mdt_hsm_agent_send(struct mdt_thread_info *mti,
                        struct mdt_object *obj;
                        struct md_hsm hsm;
 
                        struct mdt_object *obj;
                        struct md_hsm hsm;
 
-                       obj = mdt_hsm_get_md_hsm(mti, &hai->hai_fid, &hsm,
-                                                NULL);
-                       if (IS_ERR(obj) && (hai->hai_action == HSMA_REMOVE))
-                               continue;
+                       obj = mdt_hsm_get_md_hsm(mti, &hai->hai_fid, &hsm);
+                       if (!IS_ERR(obj)) {
+                               mdt_object_put(mti->mti_env, obj);
+                       } else {
+                               if (hai->hai_action == HSMA_REMOVE)
+                                       continue;
 
 
-                       if (IS_ERR(obj) && (PTR_ERR(obj) == -ENOENT)) {
-                               fail_request = true;
-                               rc = mdt_agent_record_update(mti->mti_env, mdt,
+                               if (PTR_ERR(obj) == -ENOENT) {
+                                       fail_request = true;
+                                       rc = mdt_agent_record_update(
+                                                            mti->mti_env, mdt,
                                                             &hai->hai_cookie,
                                                             1, ARS_FAILED);
                                                             &hai->hai_cookie,
                                                             1, ARS_FAILED);
-                               if (rc) {
-                                       CERROR("%s: mdt_agent_record_update() "
+                                       if (rc) {
+                                               CERROR(
+                                             "%s: mdt_agent_record_update() "
                                              "failed, rc=%d, cannot update "
                                              "status to %s for cookie "
                                              LPX64": rc = %d\n",
                                              mdt_obd_name(mdt), rc,
                                              agent_req_status2name(ARS_FAILED),
                                              hai->hai_cookie, rc);
                                              "failed, rc=%d, cannot update "
                                              "status to %s for cookie "
                                              LPX64": rc = %d\n",
                                              mdt_obd_name(mdt), rc,
                                              agent_req_status2name(ARS_FAILED),
                                              hai->hai_cookie, rc);
-                                       GOTO(out_buf, rc);
+                                               GOTO(out_buf, rc);
+                                       }
+                                       continue;
                                }
                                }
-                               continue;
-                       }
-                       if (IS_ERR(obj))
                                GOTO(out_buf, rc = PTR_ERR(obj));
                                GOTO(out_buf, rc = PTR_ERR(obj));
+                       }
+
 
                        if (!mdt_hsm_is_action_compat(hai, hal->hal_archive_id,
                                                      hal->hal_flags, &hsm)) {
 
                        if (!mdt_hsm_is_action_compat(hai, hal->hal_archive_id,
                                                      hal->hal_flags, &hsm)) {
index af2ddf4..b47acea 100644 (file)
@@ -326,7 +326,7 @@ int mdt_hsm_add_actions(struct mdt_thread_info *mti,
                        goto record;
 
                /* get HSM attributes */
                        goto record;
 
                /* get HSM attributes */
-               obj = mdt_hsm_get_md_hsm(mti, &hai->hai_fid, &mh, NULL);
+               obj = mdt_hsm_get_md_hsm(mti, &hai->hai_fid, &mh);
                if (IS_ERR(obj)) {
                        /* in case of archive remove, Lustre file
                         * is not mandatory */
                if (IS_ERR(obj)) {
                        /* in case of archive remove, Lustre file
                         * is not mandatory */
@@ -334,6 +334,7 @@ int mdt_hsm_add_actions(struct mdt_thread_info *mti,
                                goto record;
                        GOTO(out, rc = PTR_ERR(obj));
                }
                                goto record;
                        GOTO(out, rc = PTR_ERR(obj));
                }
+               mdt_object_put(mti->mti_env, obj);
 
                /* Check if an action is needed, compare request
                 * and HSM flags status */
 
                /* Check if an action is needed, compare request
                 * and HSM flags status */
@@ -364,7 +365,7 @@ int mdt_hsm_add_actions(struct mdt_thread_info *mti,
                        struct cdt_restore_handle       *crh;
                        struct mdt_object               *child;
 
                        struct cdt_restore_handle       *crh;
                        struct mdt_object               *child;
 
-                       OBD_ALLOC_PTR(crh);
+                       OBD_SLAB_ALLOC_PTR(crh, mdt_hsm_cdt_kmem);
                        if (crh == NULL)
                                GOTO(out, rc = -ENOMEM);
 
                        if (crh == NULL)
                                GOTO(out, rc = -ENOMEM);
 
@@ -386,7 +387,7 @@ int mdt_hsm_add_actions(struct mdt_thread_info *mti,
                                CERROR("%s: cannot take layout lock for "
                                       DFID": rc = %d\n", mdt_obd_name(mdt),
                                       PFID(&crh->crh_fid), rc);
                                CERROR("%s: cannot take layout lock for "
                                       DFID": rc = %d\n", mdt_obd_name(mdt),
                                       PFID(&crh->crh_fid), rc);
-                               OBD_FREE_PTR(crh);
+                               OBD_SLAB_FREE_PTR(crh, mdt_hsm_cdt_kmem);
                                GOTO(out, rc);
                        }
                        /* we choose to not keep a keep a reference
                                GOTO(out, rc);
                        }
                        /* we choose to not keep a keep a reference
@@ -394,10 +395,10 @@ int mdt_hsm_add_actions(struct mdt_thread_info *mti,
                         * very long */
                        mdt_object_put(mti->mti_env, child);
 
                         * very long */
                        mdt_object_put(mti->mti_env, child);
 
-                       down(&cdt->cdt_restore_lock);
+                       mutex_lock(&cdt->cdt_restore_lock);
                        cfs_list_add_tail(&crh->crh_list,
                                          &cdt->cdt_restore_hdl);
                        cfs_list_add_tail(&crh->crh_list,
                                          &cdt->cdt_restore_hdl);
-                       up(&cdt->cdt_restore_lock);
+                       mutex_unlock(&cdt->cdt_restore_lock);
                }
 record:
                /* record request */
                }
 record:
                /* record request */
@@ -411,7 +412,8 @@ record:
                rc = -ENODATA;
        else
                rc = 0;
                rc = -ENODATA;
        else
                rc = 0;
-       EXIT;
+
+       GOTO(out, rc);
 out:
        /* if work has been added, wake up coordinator */
        if ((rc == 0) || (rc == -ENODATA))
 out:
        /* if work has been added, wake up coordinator */
        if ((rc == 0) || (rc == -ENODATA))
@@ -444,7 +446,7 @@ int mdt_hsm_get_running(struct mdt_thread_info *mti,
                        RETURN(-EINVAL);
 
                car = mdt_cdt_find_request(cdt, 0, &hai->hai_fid);
                        RETURN(-EINVAL);
 
                car = mdt_cdt_find_request(cdt, 0, &hai->hai_fid);
-               if (IS_ERR(car)) {
+               if (car == NULL) {
                        hai->hai_cookie = 0;
                        hai->hai_action = HSMA_NONE;
                } else {
                        hai->hai_cookie = 0;
                        hai->hai_action = HSMA_NONE;
                } else {
@@ -477,7 +479,7 @@ bool mdt_hsm_restore_is_running(struct mdt_thread_info *mti,
        if (!fid_is_sane(fid))
                RETURN(rc);
 
        if (!fid_is_sane(fid))
                RETURN(rc);
 
-       down(&cdt->cdt_restore_lock);
+       mutex_lock(&cdt->cdt_restore_lock);
        cfs_list_for_each_safe(pos, tmp, &cdt->cdt_restore_hdl) {
                crh = cfs_list_entry(pos, struct cdt_restore_handle, crh_list);
                if (lu_fid_eq(&crh->crh_fid, fid)) {
        cfs_list_for_each_safe(pos, tmp, &cdt->cdt_restore_hdl) {
                crh = cfs_list_entry(pos, struct cdt_restore_handle, crh_list);
                if (lu_fid_eq(&crh->crh_fid, fid)) {
@@ -485,7 +487,7 @@ bool mdt_hsm_restore_is_running(struct mdt_thread_info *mti,
                        break;
                }
        }
                        break;
                }
        }
-       up(&cdt->cdt_restore_lock);
+       mutex_unlock(&cdt->cdt_restore_lock);
        RETURN(rc);
 }
 
        RETURN(rc);
 }
 
@@ -533,7 +535,7 @@ int mdt_hsm_get_actions(struct mdt_thread_info *mti,
                struct cdt_agent_req *car;
 
                car = mdt_cdt_find_request(cdt, hai->hai_cookie, NULL);
                struct cdt_agent_req *car;
 
                car = mdt_cdt_find_request(cdt, hai->hai_cookie, NULL);
-               if (IS_ERR(car)) {
+               if (car == NULL) {
                        hai->hai_cookie = 0;
                } else {
                        __u64 data_moved;
                        hai->hai_cookie = 0;
                } else {
                        __u64 data_moved;
index 9d20164..7029827 100644 (file)
@@ -153,8 +153,8 @@ out:
 /**
  * update data moved information during a request
  */
 /**
  * update data moved information during a request
  */
-static int mdt_cdt_update_work(struct cdt_req_progress *crp,
-                              struct hsm_extent *extent)
+static int hsm_update_work(struct cdt_req_progress *crp,
+                          const struct hsm_extent *extent)
 {
        int                       rc, osz, nsz;
        struct interval_node    **new_vv;
 {
        int                       rc, osz, nsz;
        struct interval_node    **new_vv;
@@ -239,11 +239,11 @@ struct cdt_agent_req *mdt_cdt_alloc_request(__u64 compound_id, __u32 archive_id,
        struct cdt_agent_req *car;
        ENTRY;
 
        struct cdt_agent_req *car;
        ENTRY;
 
-       OBD_ALLOC_PTR(car);
+       OBD_SLAB_ALLOC_PTR(car, mdt_hsm_car_kmem);
        if (car == NULL)
                RETURN(ERR_PTR(-ENOMEM));
 
        if (car == NULL)
                RETURN(ERR_PTR(-ENOMEM));
 
-       cfs_atomic_set(&car->car_refcount, 0);
+       cfs_atomic_set(&car->car_refcount, 1);
        car->car_compound_id = compound_id;
        car->car_archive_id = archive_id;
        car->car_flags = flags;
        car->car_compound_id = compound_id;
        car->car_archive_id = archive_id;
        car->car_flags = flags;
@@ -253,7 +253,7 @@ struct cdt_agent_req *mdt_cdt_alloc_request(__u64 compound_id, __u32 archive_id,
        car->car_uuid = *uuid;
        OBD_ALLOC(car->car_hai, hai->hai_len);
        if (car->car_hai == NULL) {
        car->car_uuid = *uuid;
        OBD_ALLOC(car->car_hai, hai->hai_len);
        if (car->car_hai == NULL) {
-               OBD_FREE_PTR(car);
+               OBD_SLAB_FREE_PTR(car, mdt_hsm_car_kmem);
                RETURN(ERR_PTR(-ENOMEM));
        }
        memcpy(car->car_hai, hai, hai->hai_len);
                RETURN(ERR_PTR(-ENOMEM));
        }
        memcpy(car->car_hai, hai, hai->hai_len);
@@ -271,7 +271,7 @@ void mdt_cdt_free_request(struct cdt_agent_req *car)
 {
        mdt_cdt_free_request_tree(&car->car_progress);
        OBD_FREE(car->car_hai, car->car_hai->hai_len);
 {
        mdt_cdt_free_request_tree(&car->car_progress);
        OBD_FREE(car->car_hai, car->car_hai->hai_len);
-       OBD_FREE_PTR(car);
+       OBD_SLAB_FREE_PTR(car, mdt_hsm_car_kmem);
 }
 
 /**
 }
 
 /**
@@ -290,6 +290,7 @@ void mdt_cdt_get_request(struct cdt_agent_req *car)
  */
 void mdt_cdt_put_request(struct cdt_agent_req *car)
 {
  */
 void mdt_cdt_put_request(struct cdt_agent_req *car)
 {
+       LASSERT(cfs_atomic_read(&car->car_refcount) > 0);
        if (cfs_atomic_dec_and_test(&car->car_refcount))
                mdt_cdt_free_request(car);
 }
        if (cfs_atomic_dec_and_test(&car->car_refcount))
                mdt_cdt_free_request(car);
 }
@@ -306,25 +307,20 @@ static struct cdt_agent_req *cdt_find_request_nolock(struct coordinator *cdt,
                                                     __u64 cookie,
                                                     const struct lu_fid *fid)
 {
                                                     __u64 cookie,
                                                     const struct lu_fid *fid)
 {
-       cfs_list_t              *pos;
-       struct cdt_agent_req    *car;
+       struct cdt_agent_req *car;
+       struct cdt_agent_req *found = NULL;
        ENTRY;
 
        ENTRY;
 
-       if (cfs_list_empty(&cdt->cdt_requests))
-               goto notfound;
-
-       cfs_list_for_each(pos, &cdt->cdt_requests) {
-               car = cfs_list_entry(pos, struct cdt_agent_req,
-                                    car_request_list);
+       cfs_list_for_each_entry(car, &cdt->cdt_requests, car_request_list) {
                if ((car->car_hai->hai_cookie == cookie) ||
                    ((fid != NULL) && lu_fid_eq(fid, &car->car_hai->hai_fid))) {
                        mdt_cdt_get_request(car);
                if ((car->car_hai->hai_cookie == cookie) ||
                    ((fid != NULL) && lu_fid_eq(fid, &car->car_hai->hai_fid))) {
                        mdt_cdt_get_request(car);
-                       RETURN(car);
+                       found = car;
+                       break;
                }
        }
 
                }
        }
 
-notfound:
-       RETURN(ERR_PTR(-ENOENT));
+       RETURN(found);
 }
 
 /**
 }
 
 /**
@@ -343,23 +339,19 @@ int mdt_cdt_add_request(struct coordinator *cdt, struct cdt_agent_req *new_car)
        LASSERT(new_car->car_hai->hai_action != HSMA_CANCEL);
 
        down_write(&cdt->cdt_request_lock);
        LASSERT(new_car->car_hai->hai_action != HSMA_CANCEL);
 
        down_write(&cdt->cdt_request_lock);
-
        car = cdt_find_request_nolock(cdt, new_car->car_hai->hai_cookie, NULL);
        car = cdt_find_request_nolock(cdt, new_car->car_hai->hai_cookie, NULL);
-       if (!IS_ERR(car)) {
+       if (car != NULL) {
                mdt_cdt_put_request(car);
                up_write(&cdt->cdt_request_lock);
                RETURN(-EEXIST);
        }
 
                mdt_cdt_put_request(car);
                up_write(&cdt->cdt_request_lock);
                RETURN(-EEXIST);
        }
 
-       mdt_cdt_get_request(new_car);
        cfs_list_add_tail(&new_car->car_request_list, &cdt->cdt_requests);
        up_write(&cdt->cdt_request_lock);
 
        mdt_hsm_agent_update_statistics(cdt, 0, 0, 1, &new_car->car_uuid);
 
        cfs_list_add_tail(&new_car->car_request_list, &cdt->cdt_requests);
        up_write(&cdt->cdt_request_lock);
 
        mdt_hsm_agent_update_statistics(cdt, 0, 0, 1, &new_car->car_uuid);
 
-       down(&cdt->cdt_counter_lock);
-       cdt->cdt_request_count++;
-       up(&cdt->cdt_counter_lock);
+       atomic_inc(&cdt->cdt_request_count);
 
        RETURN(0);
 }
 
        RETURN(0);
 }
@@ -372,16 +364,14 @@ int mdt_cdt_add_request(struct coordinator *cdt, struct cdt_agent_req *new_car)
  * \retval request pointer
  */
 struct cdt_agent_req *mdt_cdt_find_request(struct coordinator *cdt,
  * \retval request pointer
  */
 struct cdt_agent_req *mdt_cdt_find_request(struct coordinator *cdt,
-                                          __u64 cookie,
+                                          const __u64 cookie,
                                           const struct lu_fid *fid)
 {
        struct cdt_agent_req    *car;
        ENTRY;
 
        down_read(&cdt->cdt_request_lock);
                                           const struct lu_fid *fid)
 {
        struct cdt_agent_req    *car;
        ENTRY;
 
        down_read(&cdt->cdt_request_lock);
-
        car = cdt_find_request_nolock(cdt, cookie, fid);
        car = cdt_find_request_nolock(cdt, cookie, fid);
-
        up_read(&cdt->cdt_request_lock);
 
        RETURN(car);
        up_read(&cdt->cdt_request_lock);
 
        RETURN(car);
@@ -395,20 +385,23 @@ struct cdt_agent_req *mdt_cdt_find_request(struct coordinator *cdt,
  */
 int mdt_cdt_remove_request(struct coordinator *cdt, __u64 cookie)
 {
  */
 int mdt_cdt_remove_request(struct coordinator *cdt, __u64 cookie)
 {
-       struct cdt_agent_req    *car;
+       struct cdt_agent_req *car;
        ENTRY;
 
        down_write(&cdt->cdt_request_lock);
        ENTRY;
 
        down_write(&cdt->cdt_request_lock);
-
        car = cdt_find_request_nolock(cdt, cookie, NULL);
        car = cdt_find_request_nolock(cdt, cookie, NULL);
-       if (!IS_ERR(car)) {
+       if (car != NULL) {
                cfs_list_del(&car->car_request_list);
                cfs_list_del(&car->car_request_list);
-               mdt_cdt_put_request(car);
                up_write(&cdt->cdt_request_lock);
 
                up_write(&cdt->cdt_request_lock);
 
-               down(&cdt->cdt_counter_lock);
-               cdt->cdt_request_count--;
-               up(&cdt->cdt_counter_lock);
+               /* reference from cdt_requests list */
+               mdt_cdt_put_request(car);
+
+               /* reference from cdt_find_request_nolock() */
+               mdt_cdt_put_request(car);
+
+               LASSERT(atomic_read(&cdt->cdt_request_count) > 0);
+               atomic_dec(&cdt->cdt_request_count);
 
                RETURN(0);
        }
 
                RETURN(0);
        }
@@ -426,21 +419,21 @@ int mdt_cdt_remove_request(struct coordinator *cdt, __u64 cookie)
  * \retval -ve failure
  */
 struct cdt_agent_req *mdt_cdt_update_request(struct coordinator *cdt,
  * \retval -ve failure
  */
 struct cdt_agent_req *mdt_cdt_update_request(struct coordinator *cdt,
-                                            struct hsm_progress_kernel *pgs)
+                                         const struct hsm_progress_kernel *pgs)
 {
        struct cdt_agent_req    *car;
        int                      rc;
        ENTRY;
 
        car = mdt_cdt_find_request(cdt, pgs->hpk_cookie, NULL);
 {
        struct cdt_agent_req    *car;
        int                      rc;
        ENTRY;
 
        car = mdt_cdt_find_request(cdt, pgs->hpk_cookie, NULL);
-       if (IS_ERR(car))
-               RETURN(car);
+       if (car == NULL)
+               RETURN(ERR_PTR(-ENOENT));
 
        car->car_req_update = cfs_time_current_sec();
 
        /* update progress done by copy tool */
        if (pgs->hpk_errval == 0 && pgs->hpk_extent.length != 0) {
 
        car->car_req_update = cfs_time_current_sec();
 
        /* update progress done by copy tool */
        if (pgs->hpk_errval == 0 && pgs->hpk_extent.length != 0) {
-               rc = mdt_cdt_update_work(&car->car_progress, &pgs->hpk_extent);
+               rc = hsm_update_work(&car->car_progress, &pgs->hpk_extent);
                if (rc) {
                        mdt_cdt_put_request(car);
                        RETURN(ERR_PTR(rc));
                if (rc) {
                        mdt_cdt_put_request(car);
                        RETURN(ERR_PTR(rc));
index 42ff0b6..e45190a 100644 (file)
@@ -92,7 +92,7 @@ struct mdt_file_data {
 /* when adding a new policy, do not forget to update
  * lustre/mdt/mdt_coordinator.c::hsm_policy_names[]
  */
 /* when adding a new policy, do not forget to update
  * lustre/mdt/mdt_coordinator.c::hsm_policy_names[]
  */
-#define CDT_DEFAULT_POLICY             0x0000000000000000ULL
+#define CDT_DEFAULT_POLICY             CDT_NORETRY_ACTION
 
 enum cdt_states { CDT_STOPPED = 0,
                  CDT_INIT,
 
 enum cdt_states { CDT_STOPPED = 0,
                  CDT_INIT,
@@ -108,27 +108,26 @@ enum cdt_states { CDT_STOPPED = 0,
  * cdt_request_lock
  */
 struct coordinator {
  * cdt_request_lock
  */
 struct coordinator {
-       struct ptlrpc_thread    *cdt_thread;        /**< coordinator thread */
+       struct ptlrpc_thread     cdt_thread;        /**< coordinator thread */
        struct lu_env            cdt_env;           /**< coordinator lustre
                                                     * env */
        struct lu_env            cdt_env;           /**< coordinator lustre
                                                     * env */
+       struct lu_context        cdt_session;       /** session for lu_ucred */
        struct proc_dir_entry   *cdt_proc_dir;      /**< cdt /proc directory */
        __u64                    cdt_policy;        /**< flags to defined
                                                     * policy */
        enum cdt_states          cdt_state;         /**< state */
        struct proc_dir_entry   *cdt_proc_dir;      /**< cdt /proc directory */
        __u64                    cdt_policy;        /**< flags to defined
                                                     * policy */
        enum cdt_states          cdt_state;         /**< state */
-       cfs_atomic_t             cdt_compound_id;   /**< compound id counter */
+       atomic_t                 cdt_compound_id;   /**< compound id counter */
        __u64                    cdt_last_cookie;   /**< last cookie allocated */
        __u64                    cdt_last_cookie;   /**< last cookie allocated */
-       struct semaphore         cdt_counter_lock;  /**< protect request
-                                                    * counter */
-       struct semaphore         cdt_llog_lock;     /**< protect llog access */
+       struct mutex             cdt_llog_lock;     /**< protect llog access */
        struct rw_semaphore      cdt_agent_lock;    /**< protect agent list */
        struct rw_semaphore      cdt_request_lock;  /**< protect request list */
        struct rw_semaphore      cdt_agent_lock;    /**< protect agent list */
        struct rw_semaphore      cdt_request_lock;  /**< protect request list */
-       struct semaphore         cdt_restore_lock;  /**< protect restore list */
+       struct mutex             cdt_restore_lock;  /**< protect restore list */
        cfs_time_t               cdt_loop_period;   /**< llog scan period */
        cfs_time_t               cdt_delay;         /**< request grace delay */
        cfs_time_t               cdt_timeout;       /**< request timeout */
        __u64                    cdt_max_request;   /**< max count of started
                                                     * requests */
        cfs_time_t               cdt_loop_period;   /**< llog scan period */
        cfs_time_t               cdt_delay;         /**< request grace delay */
        cfs_time_t               cdt_timeout;       /**< request timeout */
        __u64                    cdt_max_request;   /**< max count of started
                                                     * requests */
-       __u64                    cdt_request_count; /**< current count of
+       atomic_t                 cdt_request_count; /**< current count of
                                                     * started requests */
        cfs_list_t               cdt_requests;      /**< list of started
                                                     * requests */
                                                     * started requests */
        cfs_list_t               cdt_requests;      /**< list of started
                                                     * requests */
@@ -297,13 +296,13 @@ struct mdt_lock_handle {
 };
 
 enum {
 };
 
 enum {
-       MDT_LH_PARENT, /* parent lockh */
-       MDT_LH_CHILD,  /* child lockh */
-       MDT_LH_OLD,    /* old lockh for rename */
+       MDT_LH_PARENT,  /* parent lockh */
+       MDT_LH_CHILD,   /* child lockh */
+       MDT_LH_OLD,     /* old lockh for rename */
        MDT_LH_LAYOUT = MDT_LH_OLD, /* layout lock */
        MDT_LH_LAYOUT = MDT_LH_OLD, /* layout lock */
-       MDT_LH_NEW,    /* new lockh for rename */
-       MDT_LH_RMT,    /* used for return lh to caller */
-       MDT_LH_LOCAL,  /* local lock never return to client */
+       MDT_LH_NEW,     /* new lockh for rename */
+       MDT_LH_RMT,     /* used for return lh to caller */
+       MDT_LH_LOCAL,   /* local lock never return to client */
        MDT_LH_NR
 };
 
        MDT_LH_NR
 };
 
@@ -590,6 +589,7 @@ struct cdt_agent_req {
        struct cdt_req_progress  car_progress;     /**< track data mvt
                                                    *   progress */
 };
        struct cdt_req_progress  car_progress;     /**< track data mvt
                                                    *   progress */
 };
+extern struct kmem_cache *mdt_hsm_car_kmem;
 
 struct hsm_agent {
        cfs_list_t       ha_list;               /**< to chain the agents */
 
 struct hsm_agent {
        cfs_list_t       ha_list;               /**< to chain the agents */
@@ -609,6 +609,7 @@ struct cdt_restore_handle {
        struct ldlm_extent       crh_extent;    /**< extent of the restore */
        struct mdt_lock_handle   crh_lh;        /**< lock handle */
 };
        struct ldlm_extent       crh_extent;    /**< extent of the restore */
        struct mdt_lock_handle   crh_lh;        /**< lock handle */
 };
+extern struct kmem_cache *mdt_hsm_cdt_kmem;    /** restore handle slab cache */
 
 static inline const struct md_device_operations *
 mdt_child_ops(struct mdt_device * m)
 
 static inline const struct md_device_operations *
 mdt_child_ops(struct mdt_device * m)
@@ -877,7 +878,7 @@ extern struct lprocfs_vars lprocfs_mds_module_vars[];
 extern struct lprocfs_vars lprocfs_mds_obd_vars[];
 
 int mdt_hsm_attr_set(struct mdt_thread_info *info, struct mdt_object *obj,
 extern struct lprocfs_vars lprocfs_mds_obd_vars[];
 
 int mdt_hsm_attr_set(struct mdt_thread_info *info, struct mdt_object *obj,
-                    struct md_hsm *mh);
+                    const struct md_hsm *mh);
 
 struct mdt_handler *mdt_handler_find(__u32 opc,
                                     struct mdt_opc_slice *supported);
 
 struct mdt_handler *mdt_handler_find(__u32 opc,
                                     struct mdt_opc_slice *supported);
@@ -935,7 +936,8 @@ int mdt_hsm_ct_unregister(struct mdt_thread_info *info);
 int mdt_hsm_request(struct mdt_thread_info *info);
 /* mdt/mdt_hsm_cdt_actions.c */
 extern const struct file_operations mdt_agent_actions_fops;
 int mdt_hsm_request(struct mdt_thread_info *info);
 /* mdt/mdt_hsm_cdt_actions.c */
 extern const struct file_operations mdt_agent_actions_fops;
-void dump_llog_agent_req_rec(char *prefix, struct llog_agent_req_rec *larr);
+void dump_llog_agent_req_rec(const char *prefix,
+                            const struct llog_agent_req_rec *larr);
 int cdt_llog_process(const struct lu_env *env, struct mdt_device *mdt,
                     llog_cb_t cb, void *data);
 int mdt_agent_record_add(const struct lu_env *env, struct mdt_device *mdt,
 int cdt_llog_process(const struct lu_env *env, struct mdt_device *mdt,
                     llog_cb_t cb, void *data);
 int mdt_agent_record_add(const struct lu_env *env, struct mdt_device *mdt,
@@ -976,7 +978,6 @@ int mdt_hsm_get_running(struct mdt_thread_info *mti,
                        struct hsm_action_list *hal);
 bool mdt_hsm_restore_is_running(struct mdt_thread_info *mti,
                                const struct lu_fid *fid);
                        struct hsm_action_list *hal);
 bool mdt_hsm_restore_is_running(struct mdt_thread_info *mti,
                                const struct lu_fid *fid);
-
 /* mdt/mdt_hsm_cdt_requests.c */
 extern const struct file_operations mdt_hsm_request_fops;
 void dump_requests(char *prefix, struct coordinator *cdt);
 /* mdt/mdt_hsm_cdt_requests.c */
 extern const struct file_operations mdt_hsm_request_fops;
 void dump_requests(char *prefix, struct coordinator *cdt);
@@ -986,75 +987,42 @@ struct cdt_agent_req *mdt_cdt_alloc_request(__u64 compound_id, __u32 archive_id,
 void mdt_cdt_free_request(struct cdt_agent_req *car);
 int mdt_cdt_add_request(struct coordinator *cdt, struct cdt_agent_req *new_car);
 struct cdt_agent_req *mdt_cdt_find_request(struct coordinator *cdt,
 void mdt_cdt_free_request(struct cdt_agent_req *car);
 int mdt_cdt_add_request(struct coordinator *cdt, struct cdt_agent_req *new_car);
 struct cdt_agent_req *mdt_cdt_find_request(struct coordinator *cdt,
-                                          __u64 cookie,
+                                          const __u64 cookie,
                                           const struct lu_fid *fid);
 void mdt_cdt_get_work_done(struct cdt_agent_req *car, __u64 *done_sz);
 void mdt_cdt_get_request(struct cdt_agent_req *car);
 void mdt_cdt_put_request(struct cdt_agent_req *car);
 struct cdt_agent_req *mdt_cdt_update_request(struct coordinator *cdt,
                                           const struct lu_fid *fid);
 void mdt_cdt_get_work_done(struct cdt_agent_req *car, __u64 *done_sz);
 void mdt_cdt_get_request(struct cdt_agent_req *car);
 void mdt_cdt_put_request(struct cdt_agent_req *car);
 struct cdt_agent_req *mdt_cdt_update_request(struct coordinator *cdt,
-                                            struct hsm_progress_kernel *pgs);
+                                        const struct hsm_progress_kernel *pgs);
 int mdt_cdt_remove_request(struct coordinator *cdt, __u64 cookie);
 int mdt_cdt_remove_request(struct coordinator *cdt, __u64 cookie);
-
-/* fake functions, will be remove with patch LU-3343 */
-static inline struct mdt_object *mdt_hsm_get_md_hsm(struct mdt_thread_info *mti,
-                                                   struct lu_fid *fid,
-                                                   struct md_hsm *hsm,
-                                                   struct mdt_lock_handle *lh)
-{
-       return ERR_PTR(-EINVAL);
-}
-static inline bool mdt_hsm_is_action_compat(struct hsm_action_item *hai,
-                                           int hal_an, __u64 rq_flags,
-                                           struct md_hsm *hsm)
-{
-       return false;
-}
-static inline int mdt_hsm_cdt_init(struct mdt_device *mdt)
-{
-       struct coordinator      *cdt = &mdt->mdt_coordinator;
-
-       /* minimal init before final patch landing */
-       sema_init(&cdt->cdt_llog_lock, 1);
-       init_rwsem(&cdt->cdt_agent_lock);
-       init_rwsem(&cdt->cdt_request_lock);
-       sema_init(&cdt->cdt_restore_lock, 1);
-
-       CFS_INIT_LIST_HEAD(&cdt->cdt_requests);
-       CFS_INIT_LIST_HEAD(&cdt->cdt_agents);
-       CFS_INIT_LIST_HEAD(&cdt->cdt_restore_hdl);
-
-       cdt->cdt_state = CDT_STOPPED;
-       return 0;
-}
-static inline int mdt_hsm_cdt_start(struct mdt_device *mdt)
-{
-       return 0;
-}
-static inline int mdt_hsm_cdt_stop(struct mdt_device *mdt)
-{
-       return 0;
-}
-static inline int mdt_hsm_cdt_fini(struct mdt_device *mdt)
-{
-       return 0;
-}
-static inline int mdt_hsm_cdt_wakeup(struct mdt_device *mdt)
-{
-       return 0;
-}
-static inline int mdt_hsm_add_hal(struct mdt_thread_info *mti,
-                                 struct hsm_action_list *hal,
-                                 const struct obd_uuid *uuid)
-{
-       return 0;
-}
-static inline int mdt_hsm_update_request_state(struct mdt_thread_info *mti,
-                                              struct hsm_progress_kernel *pgs,
-                                              bool update_record)
-{
-       return 0;
-}
-/* end of fake functions */
+/* mdt/mdt_coordinator.c */
+void mdt_hsm_dump_hal(int level, const char *prefix,
+                     struct hsm_action_list *hal);
+/* coordinator management */
+int mdt_hsm_cdt_init(struct mdt_device *mdt);
+int mdt_hsm_cdt_start(struct mdt_device *mdt);
+int mdt_hsm_cdt_stop(struct mdt_device *mdt);
+int mdt_hsm_cdt_fini(struct mdt_device *mdt);
+int mdt_hsm_cdt_wakeup(struct mdt_device *mdt);
+
+/* coordinator control /proc interface */
+int lprocfs_wr_hsm_cdt_control(struct file *file, const char *buffer,
+                              unsigned long count, void *data);
+int lprocfs_rd_hsm_cdt_control(char *page, char **start, off_t off,
+                              int count, int *eof, void *data);
+/* md_hsm helpers */
+struct mdt_object *mdt_hsm_get_md_hsm(struct mdt_thread_info *mti,
+                                     const struct lu_fid *fid,
+                                     struct md_hsm *hsm);
+/* actions/request helpers */
+int mdt_hsm_add_hal(struct mdt_thread_info *mti,
+                   struct hsm_action_list *hal, struct obd_uuid *uuid);
+bool mdt_hsm_is_action_compat(const struct hsm_action_item *hai,
+                             const int hal_an, const __u64 rq_flags,
+                             const struct md_hsm *hsm);
+int mdt_hsm_update_request_state(struct mdt_thread_info *mti,
+                                struct hsm_progress_kernel *pgs,
+                                const int update_record);
 
 extern struct lu_context_key       mdt_thread_key;
 /* debug issues helper starts here*/
 
 extern struct lu_context_key       mdt_thread_key;
 /* debug issues helper starts here*/
index 17aa9e0..fd2b92d 100644 (file)
@@ -949,50 +949,79 @@ static int lprocfs_wr_enable_remote_dir_gid(struct file *file,
 }
 
 static struct lprocfs_vars lprocfs_mdt_obd_vars[] = {
 }
 
 static struct lprocfs_vars lprocfs_mdt_obd_vars[] = {
-        { "uuid",                       lprocfs_rd_uuid,                 0, 0 },
-        { "recovery_status",            lprocfs_obd_rd_recovery_status,  0, 0 },
-        { "num_exports",                lprocfs_rd_num_exports,          0, 0 },
-        { "identity_expire",            lprocfs_rd_identity_expire,
-                                        lprocfs_wr_identity_expire,         0 },
-        { "identity_acquire_expire",    lprocfs_rd_identity_acquire_expire,
-                                        lprocfs_wr_identity_acquire_expire, 0 },
-        { "identity_upcall",            lprocfs_rd_identity_upcall,
-                                        lprocfs_wr_identity_upcall,         0 },
-        { "identity_flush",             0, lprocfs_wr_identity_flush,       0 },
-        { "identity_info",              0, lprocfs_wr_identity_info,        0 },
-        { "capa",                       lprocfs_rd_capa,
-                                        lprocfs_wr_capa,                    0 },
-        { "capa_timeout",               lprocfs_rd_capa_timeout,
-                                        lprocfs_wr_capa_timeout,            0 },
-        { "capa_key_timeout",           lprocfs_rd_ck_timeout,
-                                        lprocfs_wr_ck_timeout,              0 },
-        { "capa_count",                 lprocfs_rd_capa_count,           0, 0 },
-        { "site_stats",                 lprocfs_rd_site_stats,           0, 0 },
-        { "evict_client",               0, lprocfs_mdt_wr_evict_client,     0 },
-        { "hash_stats",                 lprocfs_obd_rd_hash,    0, 0 },
-        { "sec_level",                  lprocfs_rd_sec_level,
-                                        lprocfs_wr_sec_level,               0 },
-        { "commit_on_sharing",          lprocfs_rd_cos, lprocfs_wr_cos, 0 },
-        { "root_squash",                lprocfs_rd_root_squash,
-                                        lprocfs_wr_root_squash,             0 },
-        { "nosquash_nids",              lprocfs_rd_nosquash_nids,
-                                        lprocfs_wr_nosquash_nids,           0 },
-        { "som",                        lprocfs_rd_mdt_som,
-                                        lprocfs_wr_mdt_som, 0 },
-        { "instance",                   lprocfs_target_rd_instance,         0 },
-        { "ir_factor",                  lprocfs_obd_rd_ir_factor,
-                                        lprocfs_obd_wr_ir_factor,           0 },
+       { "uuid",                       lprocfs_rd_uuid, NULL,
+                                       NULL, NULL, 0 },
+       { "recovery_status",            lprocfs_obd_rd_recovery_status, NULL,
+                                       NULL, NULL, 0 },
+       { "num_exports",                lprocfs_rd_num_exports, NULL,
+                                       NULL, NULL, 0 },
+       { "identity_expire",            lprocfs_rd_identity_expire,
+                                       lprocfs_wr_identity_expire,
+                                       NULL, NULL, 0 },
+       { "identity_acquire_expire",    lprocfs_rd_identity_acquire_expire,
+                                       lprocfs_wr_identity_acquire_expire,
+                                       NULL, NULL, 0 },
+       { "identity_upcall",            lprocfs_rd_identity_upcall,
+                                       lprocfs_wr_identity_upcall,
+                                       NULL, NULL, 0 },
+       { "identity_flush",             NULL, lprocfs_wr_identity_flush,
+                                       NULL, NULL, 0 },
+       { "identity_info",              NULL, lprocfs_wr_identity_info,
+                                       NULL, NULL, 0 },
+       { "capa",                       lprocfs_rd_capa,
+                                       lprocfs_wr_capa,
+                                       NULL, NULL, 0 },
+       { "capa_timeout",               lprocfs_rd_capa_timeout,
+                                       lprocfs_wr_capa_timeout,
+                                       NULL, NULL, 0 },
+       { "capa_key_timeout",           lprocfs_rd_ck_timeout,
+                                       lprocfs_wr_ck_timeout,
+                                       NULL, NULL, 0 },
+       { "capa_count",                 lprocfs_rd_capa_count, NULL,
+                                       NULL, NULL, 0 },
+       { "site_stats",                 lprocfs_rd_site_stats, NULL,
+                                       NULL, NULL, 0 },
+       { "evict_client",               NULL, lprocfs_mdt_wr_evict_client,
+                                       NULL, NULL, 0 },
+       { "hash_stats",                 lprocfs_obd_rd_hash, NULL,
+                                       NULL, NULL, 0 },
+       { "sec_level",                  lprocfs_rd_sec_level,
+                                       lprocfs_wr_sec_level,
+                                       NULL, NULL, 0 },
+       { "commit_on_sharing",          lprocfs_rd_cos, lprocfs_wr_cos,
+                                       NULL, NULL, 0 },
+       { "root_squash",                lprocfs_rd_root_squash,
+                                       lprocfs_wr_root_squash,
+                                       NULL, NULL, 0 },
+       { "nosquash_nids",              lprocfs_rd_nosquash_nids,
+                                       lprocfs_wr_nosquash_nids,
+                                       NULL, NULL, 0 },
+       { "som",                        lprocfs_rd_mdt_som,
+                                       lprocfs_wr_mdt_som,
+                                       NULL, NULL, 0 },
+       { "instance",                   lprocfs_target_rd_instance, NULL,
+                                       NULL, NULL, 0},
+       { "ir_factor",                  lprocfs_obd_rd_ir_factor,
+                                       lprocfs_obd_wr_ir_factor,
+                                       NULL, NULL, 0 },
        { "job_cleanup_interval",       lprocfs_rd_job_interval,
        { "job_cleanup_interval",       lprocfs_rd_job_interval,
-                                       lprocfs_wr_job_interval, 0 },
+                                       lprocfs_wr_job_interval,
+                                       NULL, NULL, 0 },
        { "enable_remote_dir",          lprocfs_rd_enable_remote_dir,
        { "enable_remote_dir",          lprocfs_rd_enable_remote_dir,
-                                       lprocfs_wr_enable_remote_dir,       0},
+                                       lprocfs_wr_enable_remote_dir,
+                                       NULL, NULL, 0},
        { "enable_remote_dir_gid",      lprocfs_rd_enable_remote_dir_gid,
        { "enable_remote_dir_gid",      lprocfs_rd_enable_remote_dir_gid,
-                                       lprocfs_wr_enable_remote_dir_gid,   0},
+                                       lprocfs_wr_enable_remote_dir_gid,
+                                       NULL, NULL, 0},
+       { "hsm_control",                lprocfs_rd_hsm_cdt_control,
+                                       lprocfs_wr_hsm_cdt_control,
+                                       NULL, NULL, 0 },
        { 0 }
 };
 
 static struct lprocfs_vars lprocfs_mdt_module_vars[] = {
        { 0 }
 };
 
 static struct lprocfs_vars lprocfs_mdt_module_vars[] = {
-        { "num_refs",                   lprocfs_rd_numrefs,              0, 0 },
+       { "num_refs",                   lprocfs_rd_numrefs, NULL,
+                                       NULL, NULL, 0 },
         { 0 }
 };
 
         { 0 }
 };
 
@@ -1003,12 +1032,12 @@ void lprocfs_mdt_init_vars(struct lprocfs_static_vars *lvars)
 }
 
 struct lprocfs_vars lprocfs_mds_obd_vars[] = {
 }
 
 struct lprocfs_vars lprocfs_mds_obd_vars[] = {
-       { "uuid",        lprocfs_rd_uuid,       0, 0 },
+       { "uuid",       lprocfs_rd_uuid, NULL, NULL, NULL, 0 },
        { 0 }
 };
 
 struct lprocfs_vars lprocfs_mds_module_vars[] = {
        { 0 }
 };
 
 struct lprocfs_vars lprocfs_mds_module_vars[] = {
-       { "num_refs",     lprocfs_rd_numrefs,     0, 0 },
+       { "num_refs",   lprocfs_rd_numrefs, NULL, NULL, NULL, 0 },
        { 0 }
 };
 
        { 0 }
 };
 
index 111d707..f996438 100644 (file)
@@ -185,7 +185,7 @@ EXPORT_SYMBOL(lustre_buf2hsm);
  * \param buf - is the output buffer where to pack the on-disk HSM xattr.
  * \param mh  - is the md_hsm structure to pack.
  */
  * \param buf - is the output buffer where to pack the on-disk HSM xattr.
  * \param mh  - is the md_hsm structure to pack.
  */
-void lustre_hsm2buf(void *buf, struct md_hsm *mh)
+void lustre_hsm2buf(void *buf, const struct md_hsm *mh)
 {
        struct hsm_attrs *attrs = (struct hsm_attrs *)buf;
        ENTRY;
 {
        struct hsm_attrs *attrs = (struct hsm_attrs *)buf;
        ENTRY;