* (C) Copyright 2012 Commissariat a l'energie atomique et aux energies
* alternatives
*
+ * Copyright (c) 2013, 2017, Intel Corporation.
*/
/*
* lustre/mdt/mdt_hsm_cdt_client.c
#define DEBUG_SUBSYSTEM S_MDS
#include <obd_support.h>
-#include <lustre_net.h>
#include <lustre_export.h>
#include <obd.h>
-#include <obd_lov.h>
#include <lprocfs_status.h>
#include <lustre_log.h>
#include "mdt_internal.h"
/**
- * data passed to llog_cat_process() callback
- * to find compatible requests
- */
-struct hsm_compat_data_cb {
- struct coordinator *cdt;
- struct hsm_action_list *hal;
-};
-
-/**
* llog_cat_process() callback, used to find record
* compatibles with a new hsm_action_list
* \param env [IN] environment
* \param llh [IN] llog handle
* \param hdr [IN] llog record
- * \param data [IN] cb data = hsm_compat_data_cb
+ * \param data [IN] cb data = hal
* \retval 0 success
* \retval -ve failure
*/
struct llog_handle *llh,
struct llog_rec_hdr *hdr, void *data)
{
- struct llog_agent_req_rec *larr;
- struct hsm_compat_data_cb *hcdcb;
- struct hsm_action_item *hai;
- int i;
+ struct llog_agent_req_rec *larr = (struct llog_agent_req_rec *)hdr;
+ struct hsm_action_list *hal = data;
+ struct hsm_action_item *hai;
+ int i;
ENTRY;
- larr = (struct llog_agent_req_rec *)hdr;
- hcdcb = data;
/* a compatible request must be WAITING or STARTED
* and not a cancel */
if ((larr->arr_status != ARS_WAITING &&
larr->arr_hai.hai_action == HSMA_CANCEL)
RETURN(0);
- hai = hai_first(hcdcb->hal);
- for (i = 0; i < hcdcb->hal->hal_count; i++, hai = hai_next(hai)) {
+ hai = hai_first(hal);
+ for (i = 0; i < hal->hal_count; i++, hai = hai_next(hai)) {
/* if request is a CANCEL:
* if cookie set in the request, there is no need to find a
* compatible one, the cookie in the request is directly used.
if (!lu_fid_eq(&hai->hai_fid, &larr->arr_hai.hai_fid))
continue;
- /* HSMA_NONE is used to find running request for some FID */
- if (hai->hai_action == HSMA_NONE) {
- hcdcb->hal->hal_archive_id = larr->arr_archive_id;
- hcdcb->hal->hal_flags = larr->arr_flags;
- *hai = larr->arr_hai;
- continue;
- }
/* in V1 we do not manage partial transfer
* so extent is always whole file
*/
hai->hai_cookie = larr->arr_hai.hai_cookie;
/* we read the archive number from the request we cancel */
- if (hai->hai_action == HSMA_CANCEL &&
- hcdcb->hal->hal_archive_id == 0)
- hcdcb->hal->hal_archive_id = larr->arr_archive_id;
+ if (hai->hai_action == HSMA_CANCEL && hal->hal_archive_id == 0)
+ hal->hal_archive_id = larr->arr_archive_id;
}
RETURN(0);
}
static int hsm_find_compatible(const struct lu_env *env, struct mdt_device *mdt,
struct hsm_action_list *hal)
{
- struct hsm_action_item *hai;
- struct hsm_compat_data_cb hcdcb;
- int rc, i, ok_cnt;
+ struct hsm_action_item *hai;
+ int rc = 0, i;
+ bool check = false;
ENTRY;
- ok_cnt = 0;
hai = hai_first(hal);
for (i = 0; i < hal->hal_count; i++, hai = hai_next(hai)) {
- /* in a cancel request hai_cookie may be set by caller to
- * show the request to be canceled
- * if not we need to search by FID
+ /* We only support ARCHIVE, RESTORE, REMOVE and CANCEL here. */
+ if (hai->hai_action == HSMA_NONE)
+ RETURN(-EINVAL);
+
+ /* In a cancel request hai_cookie may be set by caller to show
+ * the request to be canceled. If there is at least one cancel
+ * request that does not have a cookie set we need to search by
+ * FID; we can skip checking in all other cases
*/
- if (hai->hai_action == HSMA_CANCEL && hai->hai_cookie != 0)
- ok_cnt++;
- else
- hai->hai_cookie = 0;
+ if (hai->hai_action == HSMA_CANCEL && hai->hai_cookie == 0) {
+ check = true;
+ break;
+ }
}
- /* if all requests are cancel with cookie, no need to find compatible */
- if (ok_cnt == hal->hal_count)
- RETURN(0);
-
- hcdcb.cdt = &mdt->mdt_coordinator;
- hcdcb.hal = hal;
-
- rc = cdt_llog_process(env, mdt, hsm_find_compatible_cb, &hcdcb);
+ if (check)
+ rc = cdt_llog_process(env, mdt, hsm_find_compatible_cb, hal, 0,
+ 0, READ);
RETURN(rc);
}
is_needed = true;
break;
}
- CDEBUG(D_HSM, "fid="DFID" action=%s rq_flags="LPX64
- " extent="LPX64"-"LPX64" hsm_flags=%X %s\n",
+ CDEBUG(D_HSM, "fid="DFID" action=%s rq_flags=%#llx"
+ " extent=%#llx-%#llx hsm_flags=%X %s\n",
PFID(&hai->hai_fid),
hsm_copytool_action2name(hai->hai_action), rq_flags,
hai->hai_extent.offset, hai->hai_extent.length,
RETURN(true);
}
-/*
- * Coordinator external API
- */
-
-/**
- * register a list of requests
- * \param mti [IN]
- * \param hal [IN] list of requests
- * \param compound_id [OUT] id of the compound request
- * \retval 0 success
- * \retval -ve failure
- * in case of restore, caller must hold layout lock
- */
-int mdt_hsm_add_actions(struct mdt_thread_info *mti,
- struct hsm_action_list *hal, __u64 *compound_id)
+static int
+hsm_action_permission(struct mdt_thread_info *mti,
+ struct mdt_object *obj,
+ enum hsm_copytool_action hsma)
{
- struct mdt_device *mdt = mti->mti_mdt;
- struct coordinator *cdt = &mdt->mdt_coordinator;
- struct hsm_action_item *hai;
- struct mdt_object *obj = NULL;
- int rc = 0, i;
- struct md_hsm mh;
- bool is_restore = false;
+ struct coordinator *cdt = &mti->mti_mdt->mdt_coordinator;
+ struct lu_ucred *uc = mdt_ucred(mti);
+ struct md_attr *ma = &mti->mti_attr;
+ const __u64 *mask;
+ int rc;
ENTRY;
- /* no coordinator started, so we cannot serve requests */
- if (cdt->cdt_state == CDT_STOPPED)
- RETURN(-EAGAIN);
+ if (hsma != HSMA_RESTORE && mdt_rdonly(mti->mti_exp))
+ RETURN(-EROFS);
- if (!hal_is_sane(hal))
+ if (cap_raised(uc->uc_cap, CAP_SYS_ADMIN))
+ RETURN(0);
+
+ ma->ma_need = MA_INODE;
+ rc = mdt_attr_get_complex(mti, obj, ma);
+ if (rc < 0)
+ RETURN(rc);
+
+ if (uc->uc_fsuid == ma->ma_attr.la_uid)
+ mask = &cdt->cdt_user_request_mask;
+ else if (lustre_in_group_p(uc, ma->ma_attr.la_gid))
+ mask = &cdt->cdt_group_request_mask;
+ else
+ mask = &cdt->cdt_other_request_mask;
+
+ if (!(0 <= hsma && hsma < 8 * sizeof(*mask)))
RETURN(-EINVAL);
- *compound_id = atomic_inc_return(&cdt->cdt_compound_id);
+ RETURN(*mask & (1UL << hsma) ? 0 : -EPERM);
+}
- /* search for compatible request, if found hai_cookie is set
- * to the request cookie
- * it is also used to set the cookie for cancel request by FID
- */
- rc = hsm_find_compatible(mti->mti_env, mdt, hal);
- if (rc)
- GOTO(out, rc);
+/* Process a single HAL. hsm_find_compatible has already been called
+ * on it. */
+static int mdt_hsm_register_hal(struct mdt_thread_info *mti,
+ struct mdt_device *mdt,
+ struct coordinator *cdt,
+ struct hsm_action_list *hal)
+{
+ struct hsm_action_item *hai;
+ struct mdt_object *obj = NULL;
+ int rc, i;
+ struct md_hsm mh;
+ bool is_restore = false;
hai = hai_first(hal);
for (i = 0; i < hal->hal_count; i++, hai = hai_next(hai)) {
* if restore, we take the layout lock
*/
- /* if action is cancel, also no need to check */
- if (hai->hai_action == HSMA_CANCEL)
- goto record;
-
- /* get HSM attributes */
+ /* Get HSM attributes and check permissions. */
obj = mdt_hsm_get_md_hsm(mti, &hai->hai_fid, &mh);
- if (IS_ERR(obj) || obj == NULL) {
- /* in case of archive remove, Lustre file
- * is not mandatory */
- if (hai->hai_action == HSMA_REMOVE)
+ if (IS_ERR(obj)) {
+ /* In case of REMOVE and CANCEL a Lustre file
+ * is not mandatory, but restrict this
+ * exception to admins. */
+ if (cap_raised(mdt_ucred(mti)->uc_cap, CAP_SYS_ADMIN) &&
+ (hai->hai_action == HSMA_REMOVE ||
+ hai->hai_action == HSMA_CANCEL))
goto record;
- if (obj == NULL)
- GOTO(out, rc = -ENOENT);
- GOTO(out, rc = PTR_ERR(obj));
+ else
+ GOTO(out, rc = PTR_ERR(obj));
}
+
+ rc = hsm_action_permission(mti, obj, hai->hai_action);
mdt_object_put(mti->mti_env, obj);
+ if (rc < 0)
+ GOTO(out, rc);
+
+ /* if action is cancel, also no need to check */
+ if (hai->hai_action == HSMA_CANCEL)
+ goto record;
+
/* Check if an action is needed, compare request
* and HSM flags status */
if (!hsm_action_is_needed(hai, archive_id, flags, &mh))
* or we use the default if none found in lma
* this works also for archive because the default value is 0
* /!\ there is a side effect: in case of restore on multiple
- * files which are in different backend, the initial compound
+ * files which are in different backend, the initial
* request will be split in multiple requests because we cannot
* warranty an agent can serve any combinaison of archive
* backend
if (mh.mh_arch_id != 0)
archive_id = mh.mh_arch_id;
else
- archive_id = cdt->cdt_archive_id;
+ archive_id = cdt->cdt_default_archive_id;
}
/* if restore, take an exclusive lock on layout */
if (hai->hai_action == HSMA_RESTORE) {
- struct cdt_restore_handle *crh;
- struct mdt_object *child;
-
- OBD_SLAB_ALLOC_PTR(crh, mdt_hsm_cdt_kmem);
- if (crh == NULL)
- GOTO(out, rc = -ENOMEM);
-
- crh->crh_fid = hai->hai_fid;
- /* in V1 only whole file is supported
- crh->extent.start = hai->hai_extent.offset;
- crh->extent.end = hai->hai_extent.offset +
- hai->hai_extent.length;
- */
- crh->crh_extent.start = 0;
- crh->crh_extent.end = OBD_OBJECT_EOF;
-
- mdt_lock_reg_init(&crh->crh_lh, LCK_EX);
- child = mdt_object_find_lock(mti, &crh->crh_fid,
- &crh->crh_lh,
- MDS_INODELOCK_LAYOUT);
- if (IS_ERR(child)) {
- rc = PTR_ERR(child);
- CERROR("%s: cannot take layout lock for "
- DFID": rc = %d\n", mdt_obd_name(mdt),
- PFID(&crh->crh_fid), rc);
- OBD_SLAB_FREE_PTR(crh, mdt_hsm_cdt_kmem);
+ /* in V1 only whole file is supported. */
+ if (hai->hai_extent.offset != 0)
+ GOTO(out, rc = -EPROTO);
+
+ rc = cdt_restore_handle_add(mti, cdt, &hai->hai_fid,
+ &hai->hai_extent);
+ if (rc < 0)
GOTO(out, rc);
- }
- /* we choose to not keep a keep a reference
- * on the object during the restore time which can be
- * very long */
- mdt_object_put(mti->mti_env, child);
-
- mutex_lock(&cdt->cdt_restore_lock);
- list_add_tail(&crh->crh_list, &cdt->cdt_restore_hdl);
- mutex_unlock(&cdt->cdt_restore_lock);
}
record:
+ /*
+ * Wait here to catch the 2nd RESTORE request to the same FID.
+ * Normally layout lock protects against adding such request.
+ * But when cdt is stopping it cancel all locks via
+ * ldlm_resource_clean and protections may not work.
+ * See LU-9266 and sanity-hsm_407 for details.
+ */
+ OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_HSM_CDT_DELAY, cfs_fail_val);
/* record request */
- rc = mdt_agent_record_add(mti->mti_env, mdt, *compound_id,
- archive_id, flags, hai);
+ rc = mdt_agent_record_add(mti->mti_env, mdt, archive_id, flags,
+ hai);
if (rc)
GOTO(out, rc);
}
rc = 0;
GOTO(out, rc);
-out:
- /* if work has been added, wake up coordinator */
- if (rc == 0 || rc == -ENODATA)
- mdt_hsm_cdt_wakeup(mdt);
+out:
return rc;
}
+/*
+ * Coordinator external API
+ */
+
/**
- * get running action on a FID list or from cookie
+ * register a list of requests
* \param mti [IN]
- * \param hal [IN/OUT] requests
+ * \param hal [IN] list of requests
* \retval 0 success
* \retval -ve failure
+ * in case of restore, caller must hold layout lock
*/
-int mdt_hsm_get_running(struct mdt_thread_info *mti,
+int mdt_hsm_add_actions(struct mdt_thread_info *mti,
struct hsm_action_list *hal)
{
struct mdt_device *mdt = mti->mti_mdt;
struct coordinator *cdt = &mdt->mdt_coordinator;
- struct hsm_action_item *hai;
- int i;
+ int rc;
ENTRY;
- hai = hai_first(hal);
- for (i = 0; i < hal->hal_count; i++, hai = hai_next(hai)) {
- struct cdt_agent_req *car;
+ /* no coordinator started, so we cannot serve requests */
+ if (cdt->cdt_state == CDT_STOPPED || cdt->cdt_state == CDT_INIT)
+ RETURN(-EAGAIN);
- if (!fid_is_sane(&hai->hai_fid))
- RETURN(-EINVAL);
+ if (!hal_is_sane(hal))
+ RETURN(-EINVAL);
- car = mdt_cdt_find_request(cdt, 0, &hai->hai_fid);
- if (car == NULL) {
- hai->hai_cookie = 0;
- hai->hai_action = HSMA_NONE;
- } else {
- *hai = *car->car_hai;
- mdt_cdt_put_request(car);
- }
- }
- RETURN(0);
+ /* search for compatible request, if found hai_cookie is set
+ * to the request cookie
+ * it is also used to set the cookie for cancel request by FID
+ */
+ rc = hsm_find_compatible(mti->mti_env, mdt, hal);
+ if (rc)
+ GOTO(out, rc);
+
+ rc = mdt_hsm_register_hal(mti, mdt, cdt, hal);
+
+ GOTO(out, rc);
+out:
+ /* if work has been added, signal the coordinator */
+ if (rc == 0 || rc == -ENODATA)
+ mdt_hsm_cdt_event(cdt);
+
+ return rc;
}
/**
bool mdt_hsm_restore_is_running(struct mdt_thread_info *mti,
const struct lu_fid *fid)
{
- struct mdt_device *mdt = mti->mti_mdt;
- struct coordinator *cdt = &mdt->mdt_coordinator;
- struct cdt_restore_handle *crh;
- bool rc = false;
+ struct coordinator *cdt = &mti->mti_mdt->mdt_coordinator;
+ bool is_running;
ENTRY;
- if (!fid_is_sane(fid))
- RETURN(rc);
-
mutex_lock(&cdt->cdt_restore_lock);
- list_for_each_entry(crh, &cdt->cdt_restore_hdl, crh_list) {
- if (lu_fid_eq(&crh->crh_fid, fid)) {
- rc = true;
- break;
- }
- }
+ is_running = (cdt_restore_handle_find(cdt, fid) != NULL);
mutex_unlock(&cdt->cdt_restore_lock);
- RETURN(rc);
+
+ RETURN(is_running);
+}
+
+struct hsm_get_action_data {
+ const struct lu_fid *hgad_fid;
+ struct hsm_action_item hgad_hai;
+ enum agent_req_status hgad_status;
+};
+
+static int hsm_get_action_cb(const struct lu_env *env,
+ struct llog_handle *llh,
+ struct llog_rec_hdr *hdr, void *data)
+{
+ struct llog_agent_req_rec *larr = (struct llog_agent_req_rec *)hdr;
+ struct hsm_get_action_data *hgad = data;
+
+ /* A compatible request must be WAITING or STARTED and not a
+ * cancel. */
+ if ((larr->arr_status != ARS_WAITING &&
+ larr->arr_status != ARS_STARTED) ||
+ larr->arr_hai.hai_action == HSMA_CANCEL ||
+ !lu_fid_eq(&larr->arr_hai.hai_fid, hgad->hgad_fid))
+ RETURN(0);
+
+ hgad->hgad_hai = larr->arr_hai;
+ hgad->hgad_status = larr->arr_status;
+
+ RETURN(LLOG_PROC_BREAK);
}
/**
- * get registered action on a FID list
+ * get registered action on a FID
* \param mti [IN]
- * \param hal [IN/OUT] requests
+ * \param fid [IN]
+ * \param action [OUT]
+ * \param status [OUT]
+ * \param extent [OUT]
* \retval 0 success
* \retval -ve failure
*/
-int mdt_hsm_get_actions(struct mdt_thread_info *mti,
- struct hsm_action_list *hal)
+int mdt_hsm_get_action(struct mdt_thread_info *mti,
+ const struct lu_fid *fid,
+ enum hsm_copytool_action *action,
+ enum agent_req_status *status,
+ struct hsm_extent *extent)
{
- struct mdt_device *mdt = mti->mti_mdt;
- struct coordinator *cdt = &mdt->mdt_coordinator;
- struct hsm_action_item *hai;
- int i, rc;
+ const struct lu_env *env = mti->mti_env;
+ struct mdt_device *mdt = mti->mti_mdt;
+ struct coordinator *cdt = &mdt->mdt_coordinator;
+ struct hsm_get_action_data hgad = {
+ .hgad_fid = fid,
+ .hgad_hai.hai_action = HSMA_NONE,
+ };
+ struct cdt_agent_req *car;
+ int rc;
ENTRY;
- hai = hai_first(hal);
- for (i = 0; i < hal->hal_count; i++, hai = hai_next(hai)) {
- hai->hai_action = HSMA_NONE;
- if (!fid_is_sane(&hai->hai_fid))
- RETURN(-EINVAL);
- }
-
/* 1st we search in recorded requests */
- rc = hsm_find_compatible(mti->mti_env, mdt, hal);
- /* if llog file is not created, no action is recorded */
- if (rc == -ENOENT)
- RETURN(0);
-
- if (rc)
+ rc = cdt_llog_process(env, mdt, hsm_get_action_cb, &hgad, 0, 0, READ);
+ if (rc < 0)
RETURN(rc);
- /* 2nd we search if the request are running
- * cookie is cleared to tell to caller, the request is
- * waiting
- * we could in place use the record status, but in the future
- * we may want do give back dynamic informations on the
- * running request
- */
- hai = hai_first(hal);
- for (i = 0; i < hal->hal_count; i++, hai = hai_next(hai)) {
- struct cdt_agent_req *car;
-
- car = mdt_cdt_find_request(cdt, hai->hai_cookie, NULL);
- if (car == NULL) {
- hai->hai_cookie = 0;
- } else {
- __u64 data_moved;
-
- mdt_cdt_get_work_done(car, &data_moved);
- /* this is just to give the volume of data moved
- * it means data_moved data have been moved from the
- * original request but we do not know which one
- */
- hai->hai_extent.length = data_moved;
- mdt_cdt_put_request(car);
- }
+ *action = hgad.hgad_hai.hai_action;
+ *extent = hgad.hgad_hai.hai_extent;
+ *status = hgad.hgad_status;
+
+ if (*action == HSMA_NONE || *status != ARS_STARTED)
+ RETURN(0);
+
+ car = mdt_cdt_find_request(cdt, hgad.hgad_hai.hai_cookie);
+ if (car) {
+ /* This is just to give the volume of data moved.
+ * It means 'car_progress' data have been moved from the
+ * original request but we do not know which one.
+ */
+ extent->length = car->car_progress.crp_total;
+ mdt_cdt_put_request(car);
}
RETURN(0);
}
-