* (C) Copyright 2012 Commissariat a l'energie atomique et aux energies
* alternatives
*
+ * Copyright (c) 2013, 2014, Intel Corporation.
*/
/*
* lustre/mdt/mdt_hsm_cdt_client.c
#define DEBUG_SUBSYSTEM S_MDS
#include <obd_support.h>
-#include <lustre_net.h>
#include <lustre_export.h>
#include <obd.h>
-#include <obd_lov.h>
#include <lprocfs_status.h>
#include <lustre_log.h>
#include "mdt_internal.h"
hcdcb.cdt = &mdt->mdt_coordinator;
hcdcb.hal = hal;
- rc = cdt_llog_process(env, mdt, hsm_find_compatible_cb, &hcdcb);
+ rc = cdt_llog_process(env, mdt, hsm_find_compatible_cb, &hcdcb, 0, 0,
+ READ);
RETURN(rc);
}
is_needed = true;
break;
}
- CDEBUG(D_HSM, "fid="DFID" action=%s rq_flags="LPX64
- " extent="LPX64"-"LPX64" hsm_flags=%X %s\n",
+ CDEBUG(D_HSM, "fid="DFID" action=%s rq_flags=%#llx"
+ " extent=%#llx-%#llx hsm_flags=%X %s\n",
PFID(&hai->hai_fid),
hsm_copytool_action2name(hai->hai_action), rq_flags,
hai->hai_extent.offset, hai->hai_extent.length,
RETURN(true);
}
+static int
+hsm_action_permission(struct mdt_thread_info *mti,
+ struct mdt_object *obj,
+ enum hsm_copytool_action hsma)
+{
+ struct coordinator *cdt = &mti->mti_mdt->mdt_coordinator;
+ struct lu_ucred *uc = mdt_ucred(mti);
+ struct md_attr *ma = &mti->mti_attr;
+ const __u64 *mask;
+ int rc;
+ ENTRY;
+
+ if (hsma != HSMA_RESTORE && mdt_rdonly(mti->mti_exp))
+ RETURN(-EROFS);
+
+ if (md_capable(uc, CFS_CAP_SYS_ADMIN))
+ RETURN(0);
+
+ ma->ma_need = MA_INODE;
+ rc = mdt_attr_get_complex(mti, obj, ma);
+ if (rc < 0)
+ RETURN(rc);
+
+ if (uc->uc_fsuid == ma->ma_attr.la_uid)
+ mask = &cdt->cdt_user_request_mask;
+ else if (lustre_in_group_p(uc, ma->ma_attr.la_gid))
+ mask = &cdt->cdt_group_request_mask;
+ else
+ mask = &cdt->cdt_other_request_mask;
+
+ if (!(0 <= hsma && hsma < 8 * sizeof(*mask)))
+ RETURN(-EINVAL);
+
+ RETURN(*mask & (1UL << hsma) ? 0 : -EPERM);
+}
+
/*
* Coordinator external API
*/
* register a list of requests
* \param mti [IN]
* \param hal [IN] list of requests
- * \param compound_id [OUT] id of the compound request
* \retval 0 success
* \retval -ve failure
* in case of restore, caller must hold layout lock
*/
int mdt_hsm_add_actions(struct mdt_thread_info *mti,
- struct hsm_action_list *hal, __u64 *compound_id)
+ struct hsm_action_list *hal)
{
struct mdt_device *mdt = mti->mti_mdt;
struct coordinator *cdt = &mdt->mdt_coordinator;
int rc = 0, i;
struct md_hsm mh;
bool is_restore = false;
+ __u64 compound_id;
ENTRY;
/* no coordinator started, so we cannot serve requests */
if (!hal_is_sane(hal))
RETURN(-EINVAL);
- *compound_id = atomic_inc_return(&cdt->cdt_compound_id);
+ compound_id = atomic_inc_return(&cdt->cdt_compound_id);
/* search for compatible request, if found hai_cookie is set
* to the request cookie
* if restore, we take the layout lock
*/
- /* if action is cancel, also no need to check */
- if (hai->hai_action == HSMA_CANCEL)
- goto record;
-
- /* get HSM attributes */
+ /* Get HSM attributes and check permissions. */
obj = mdt_hsm_get_md_hsm(mti, &hai->hai_fid, &mh);
- if (IS_ERR(obj) || obj == NULL) {
- /* in case of archive remove, Lustre file
- * is not mandatory */
- if (hai->hai_action == HSMA_REMOVE)
+ if (IS_ERR(obj)) {
+ /* In case of REMOVE and CANCEL a Lustre file
+ * is not mandatory, but restrict this
+ * exception to admins. */
+ if (md_capable(mdt_ucred(mti), CFS_CAP_SYS_ADMIN) &&
+ (hai->hai_action == HSMA_REMOVE ||
+ hai->hai_action == HSMA_CANCEL))
goto record;
- if (obj == NULL)
- GOTO(out, rc = -ENOENT);
- GOTO(out, rc = PTR_ERR(obj));
+ else
+ GOTO(out, rc = PTR_ERR(obj));
}
+
+ rc = hsm_action_permission(mti, obj, hai->hai_action);
mdt_object_put(mti->mti_env, obj);
+ if (rc < 0)
+ GOTO(out, rc);
+
+ /* if action is cancel, also no need to check */
+ if (hai->hai_action == HSMA_CANCEL)
+ goto record;
+
/* Check if an action is needed, compare request
* and HSM flags status */
if (!hsm_action_is_needed(hai, archive_id, flags, &mh))
if (mh.mh_arch_id != 0)
archive_id = mh.mh_arch_id;
else
- archive_id = cdt->cdt_archive_id;
+ archive_id = cdt->cdt_default_archive_id;
}
/* if restore, take an exclusive lock on layout */
mdt_object_put(mti->mti_env, obj);
mutex_lock(&cdt->cdt_restore_lock);
+ if (unlikely((cdt->cdt_state == CDT_STOPPED) ||
+ (cdt->cdt_state == CDT_STOPPING))) {
+ mutex_unlock(&cdt->cdt_restore_lock);
+ mdt_object_unlock(mti, NULL, &crh->crh_lh, 1);
+ OBD_SLAB_FREE_PTR(crh, mdt_hsm_cdt_kmem);
+ GOTO(out, rc = -EAGAIN);
+ }
list_add_tail(&crh->crh_list, &cdt->cdt_restore_hdl);
mutex_unlock(&cdt->cdt_restore_lock);
}
record:
+ /*
+ * Wait here to catch the 2nd RESTORE request to the same FID.
+ * Normally layout lock protects against adding such request.
+ * But when cdt is stopping it cancel all locks via
+ * ldlm_resource_clean and protections may not work.
+ * See LU-9266 and sanity-hsm_407 for details.
+ */
+ OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_HSM_CDT_DELAY, cfs_fail_val);
/* record request */
- rc = mdt_agent_record_add(mti->mti_env, mdt, *compound_id,
+ rc = mdt_agent_record_add(mti->mti_env, mdt, compound_id,
archive_id, flags, hai);
if (rc)
GOTO(out, rc);
GOTO(out, rc);
out:
- /* if work has been added, wake up coordinator */
+ /* if work has been added, signal the coordinator */
if (rc == 0 || rc == -ENODATA)
- mdt_hsm_cdt_wakeup(mdt);
+ mdt_hsm_cdt_event(cdt);
return rc;
}
/**
- * get running action on a FID list or from cookie
- * \param mti [IN]
- * \param hal [IN/OUT] requests
- * \retval 0 success
- * \retval -ve failure
- */
-int mdt_hsm_get_running(struct mdt_thread_info *mti,
- struct hsm_action_list *hal)
-{
- struct mdt_device *mdt = mti->mti_mdt;
- struct coordinator *cdt = &mdt->mdt_coordinator;
- struct hsm_action_item *hai;
- int i;
- ENTRY;
-
- hai = hai_first(hal);
- for (i = 0; i < hal->hal_count; i++, hai = hai_next(hai)) {
- struct cdt_agent_req *car;
-
- if (!fid_is_sane(&hai->hai_fid))
- RETURN(-EINVAL);
-
- car = mdt_cdt_find_request(cdt, 0, &hai->hai_fid);
- if (car == NULL) {
- hai->hai_cookie = 0;
- hai->hai_action = HSMA_NONE;
- } else {
- *hai = *car->car_hai;
- mdt_cdt_put_request(car);
- }
- }
- RETURN(0);
-}
-
-/**
* check if a restore is running on a FID
* this is redundant with mdt_hsm_coordinator_get_running()
* but as it can be called frequently when getting attr
for (i = 0; i < hal->hal_count; i++, hai = hai_next(hai)) {
struct cdt_agent_req *car;
- car = mdt_cdt_find_request(cdt, hai->hai_cookie, NULL);
+ car = mdt_cdt_find_request(cdt, hai->hai_cookie);
if (car == NULL) {
hai->hai_cookie = 0;
} else {
RETURN(0);
}
-