X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fmdt%2Fmdt_hsm_cdt_client.c;h=9d0335c2752fc227e928c0dacaf1b4e0f02b791a;hp=8264070bed20c2391918a92e9ae045baff07d052;hb=ccabce23bd9e366c345c852f565766a799f61238;hpb=4b02a71242504c1aa51fa5bca87721031bf670ae diff --git a/lustre/mdt/mdt_hsm_cdt_client.c b/lustre/mdt/mdt_hsm_cdt_client.c index 8264070..9d0335c 100644 --- a/lustre/mdt/mdt_hsm_cdt_client.c +++ b/lustre/mdt/mdt_hsm_cdt_client.c @@ -23,6 +23,7 @@ * (C) Copyright 2012 Commissariat a l'energie atomique et aux energies * alternatives * + * Copyright (c) 2013, 2017, Intel Corporation. */ /* * lustre/mdt/mdt_hsm_cdt_client.c @@ -36,10 +37,8 @@ #define DEBUG_SUBSYSTEM S_MDS #include -#include #include #include -#include #include #include #include "mdt_internal.h" @@ -155,7 +154,8 @@ static int hsm_find_compatible(const struct lu_env *env, struct mdt_device *mdt, hcdcb.cdt = &mdt->mdt_coordinator; hcdcb.hal = hal; - rc = cdt_llog_process(env, mdt, hsm_find_compatible_cb, &hcdcb); + rc = cdt_llog_process(env, mdt, hsm_find_compatible_cb, &hcdcb, 0, 0, + READ); RETURN(rc); } @@ -200,8 +200,8 @@ static bool hsm_action_is_needed(struct hsm_action_item *hai, int hal_an, is_needed = true; break; } - CDEBUG(D_HSM, "fid="DFID" action=%s rq_flags="LPX64 - " extent="LPX64"-"LPX64" hsm_flags=%X %s\n", + CDEBUG(D_HSM, "fid="DFID" action=%s rq_flags=%#llx" + " extent=%#llx-%#llx hsm_flags=%X %s\n", PFID(&hai->hai_fid), hsm_copytool_action2name(hai->hai_action), rq_flags, hai->hai_extent.offset, hai->hai_extent.length, @@ -245,47 +245,57 @@ static bool hal_is_sane(struct hsm_action_list *hal) RETURN(true); } -/* - * Coordinator external API - */ - -/** - * register a list of requests - * \param mti [IN] - * \param hal [IN] list of requests - * \param compound_id [OUT] id of the compound request - * \retval 0 success - * \retval -ve failure - * in case of restore, caller must hold layout lock - */ -int mdt_hsm_add_actions(struct mdt_thread_info *mti, - struct hsm_action_list *hal, __u64 *compound_id) +static int +hsm_action_permission(struct mdt_thread_info *mti, + struct mdt_object *obj, + enum hsm_copytool_action hsma) { - struct mdt_device *mdt = mti->mti_mdt; - struct coordinator *cdt = &mdt->mdt_coordinator; - struct hsm_action_item *hai; - struct mdt_object *obj = NULL; - int rc = 0, i; - struct md_hsm mh; - bool is_restore = false; + struct coordinator *cdt = &mti->mti_mdt->mdt_coordinator; + struct lu_ucred *uc = mdt_ucred(mti); + struct md_attr *ma = &mti->mti_attr; + const __u64 *mask; + int rc; ENTRY; - /* no coordinator started, so we cannot serve requests */ - if (cdt->cdt_state == CDT_STOPPED) - RETURN(-EAGAIN); + if (hsma != HSMA_RESTORE && mdt_rdonly(mti->mti_exp)) + RETURN(-EROFS); - if (!hal_is_sane(hal)) + if (md_capable(uc, CFS_CAP_SYS_ADMIN)) + RETURN(0); + + ma->ma_need = MA_INODE; + rc = mdt_attr_get_complex(mti, obj, ma); + if (rc < 0) + RETURN(rc); + + if (uc->uc_fsuid == ma->ma_attr.la_uid) + mask = &cdt->cdt_user_request_mask; + else if (lustre_in_group_p(uc, ma->ma_attr.la_gid)) + mask = &cdt->cdt_group_request_mask; + else + mask = &cdt->cdt_other_request_mask; + + if (!(0 <= hsma && hsma < 8 * sizeof(*mask))) RETURN(-EINVAL); - *compound_id = atomic_inc_return(&cdt->cdt_compound_id); + RETURN(*mask & (1UL << hsma) ? 0 : -EPERM); +} - /* search for compatible request, if found hai_cookie is set - * to the request cookie - * it is also used to set the cookie for cancel request by FID - */ - rc = hsm_find_compatible(mti->mti_env, mdt, hal); - if (rc) - GOTO(out, rc); +/* Process a single HAL. hsm_find_compatible has already been called + * on it. */ +static int mdt_hsm_register_hal(struct mdt_thread_info *mti, + struct mdt_device *mdt, + struct coordinator *cdt, + struct hsm_action_list *hal) +{ + struct hsm_action_item *hai; + struct mdt_object *obj = NULL; + int rc, i; + struct md_hsm mh; + bool is_restore = false; + __u64 compound_id; + + compound_id = atomic_inc_return(&cdt->cdt_compound_id); hai = hai_first(hal); for (i = 0; i < hal->hal_count; i++, hai = hai_next(hai)) { @@ -321,23 +331,30 @@ int mdt_hsm_add_actions(struct mdt_thread_info *mti, * if restore, we take the layout lock */ - /* if action is cancel, also no need to check */ - if (hai->hai_action == HSMA_CANCEL) - goto record; - - /* get HSM attributes */ + /* Get HSM attributes and check permissions. */ obj = mdt_hsm_get_md_hsm(mti, &hai->hai_fid, &mh); - if (IS_ERR(obj) || obj == NULL) { - /* in case of archive remove, Lustre file - * is not mandatory */ - if (hai->hai_action == HSMA_REMOVE) + if (IS_ERR(obj)) { + /* In case of REMOVE and CANCEL a Lustre file + * is not mandatory, but restrict this + * exception to admins. */ + if (md_capable(mdt_ucred(mti), CFS_CAP_SYS_ADMIN) && + (hai->hai_action == HSMA_REMOVE || + hai->hai_action == HSMA_CANCEL)) goto record; - if (obj == NULL) - GOTO(out, rc = -ENOENT); - GOTO(out, rc = PTR_ERR(obj)); + else + GOTO(out, rc = PTR_ERR(obj)); } + + rc = hsm_action_permission(mti, obj, hai->hai_action); mdt_object_put(mti->mti_env, obj); + if (rc < 0) + GOTO(out, rc); + + /* if action is cancel, also no need to check */ + if (hai->hai_action == HSMA_CANCEL) + goto record; + /* Check if an action is needed, compare request * and HSM flags status */ if (!hsm_action_is_needed(hai, archive_id, flags, &mh)) @@ -351,6 +368,7 @@ int mdt_hsm_add_actions(struct mdt_thread_info *mti, /* for cancel archive number is taken from canceled request * for other request, we take from lma if not specified, + * or we use the default if none found in lma * this works also for archive because the default value is 0 * /!\ there is a side effect: in case of restore on multiple * files which are in different backend, the initial compound @@ -358,51 +376,71 @@ int mdt_hsm_add_actions(struct mdt_thread_info *mti, * warranty an agent can serve any combinaison of archive * backend */ - if (hai->hai_action != HSMA_CANCEL && archive_id == 0) - archive_id = mh.mh_arch_id; + if (hai->hai_action != HSMA_CANCEL && archive_id == 0) { + if (mh.mh_arch_id != 0) + archive_id = mh.mh_arch_id; + else + archive_id = cdt->cdt_default_archive_id; + } /* if restore, take an exclusive lock on layout */ if (hai->hai_action == HSMA_RESTORE) { - struct cdt_restore_handle *crh; - struct mdt_object *child; + struct cdt_restore_handle *crh; + + /* in V1 only whole file is supported. */ + if (hai->hai_extent.offset != 0) + GOTO(out, rc = -EPROTO); OBD_SLAB_ALLOC_PTR(crh, mdt_hsm_cdt_kmem); if (crh == NULL) GOTO(out, rc = -ENOMEM); crh->crh_fid = hai->hai_fid; - /* in V1 only whole file is supported - crh->extent.start = hai->hai_extent.offset; - crh->extent.end = hai->hai_extent.offset + - hai->hai_extent.length; - */ + /* in V1 only whole file is supported. However the + * restore may be due to truncate. */ crh->crh_extent.start = 0; - crh->crh_extent.end = OBD_OBJECT_EOF; + crh->crh_extent.end = hai->hai_extent.length; mdt_lock_reg_init(&crh->crh_lh, LCK_EX); - child = mdt_object_find_lock(mti, &crh->crh_fid, - &crh->crh_lh, - MDS_INODELOCK_LAYOUT); - if (IS_ERR(child)) { - rc = PTR_ERR(child); + obj = mdt_object_find_lock(mti, &crh->crh_fid, + &crh->crh_lh, + MDS_INODELOCK_LAYOUT); + if (IS_ERR(obj)) { + rc = PTR_ERR(obj); CERROR("%s: cannot take layout lock for " DFID": rc = %d\n", mdt_obd_name(mdt), PFID(&crh->crh_fid), rc); OBD_SLAB_FREE_PTR(crh, mdt_hsm_cdt_kmem); GOTO(out, rc); } + /* we choose to not keep a keep a reference * on the object during the restore time which can be * very long */ - mdt_object_put(mti->mti_env, child); + mdt_object_put(mti->mti_env, obj); mutex_lock(&cdt->cdt_restore_lock); + if (unlikely((cdt->cdt_state == CDT_STOPPED) || + (cdt->cdt_state == CDT_STOPPING))) { + mutex_unlock(&cdt->cdt_restore_lock); + mdt_object_unlock(mti, NULL, &crh->crh_lh, 1); + OBD_SLAB_FREE_PTR(crh, mdt_hsm_cdt_kmem); + GOTO(out, rc = -EAGAIN); + } list_add_tail(&crh->crh_list, &cdt->cdt_restore_hdl); mutex_unlock(&cdt->cdt_restore_lock); } record: + /* + * Wait here to catch the 2nd RESTORE request to the same FID. + * Normally layout lock protects against adding such request. + * But when cdt is stopping it cancel all locks via + * ldlm_resource_clean and protections may not work. + * See LU-9266 and sanity-hsm_407 for details. + */ + OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_HSM_CDT_DELAY, cfs_fail_val); /* record request */ - rc = mdt_agent_record_add(mti->mti_env, mdt, *compound_id, + rc = mdt_agent_record_add(mti->mti_env, mdt, compound_id, archive_id, flags, hai); if (rc) GOTO(out, rc); @@ -414,47 +452,55 @@ record: rc = 0; GOTO(out, rc); -out: - /* if work has been added, wake up coordinator */ - if (rc == 0 || rc == -ENODATA) - mdt_hsm_cdt_wakeup(mdt); +out: return rc; } +/* + * Coordinator external API + */ + /** - * get running action on a FID list or from cookie + * register a list of requests * \param mti [IN] - * \param hal [IN/OUT] requests + * \param hal [IN] list of requests * \retval 0 success * \retval -ve failure + * in case of restore, caller must hold layout lock */ -int mdt_hsm_get_running(struct mdt_thread_info *mti, +int mdt_hsm_add_actions(struct mdt_thread_info *mti, struct hsm_action_list *hal) { struct mdt_device *mdt = mti->mti_mdt; struct coordinator *cdt = &mdt->mdt_coordinator; - struct hsm_action_item *hai; - int i; + int rc; ENTRY; - hai = hai_first(hal); - for (i = 0; i < hal->hal_count; i++, hai = hai_next(hai)) { - struct cdt_agent_req *car; + /* no coordinator started, so we cannot serve requests */ + if (cdt->cdt_state == CDT_STOPPED) + RETURN(-EAGAIN); - if (!fid_is_sane(&hai->hai_fid)) - RETURN(-EINVAL); + if (!hal_is_sane(hal)) + RETURN(-EINVAL); - car = mdt_cdt_find_request(cdt, 0, &hai->hai_fid); - if (car == NULL) { - hai->hai_cookie = 0; - hai->hai_action = HSMA_NONE; - } else { - *hai = *car->car_hai; - mdt_cdt_put_request(car); - } - } - RETURN(0); + /* search for compatible request, if found hai_cookie is set + * to the request cookie + * it is also used to set the cookie for cancel request by FID + */ + rc = hsm_find_compatible(mti->mti_env, mdt, hal); + if (rc) + GOTO(out, rc); + + rc = mdt_hsm_register_hal(mti, mdt, cdt, hal); + + GOTO(out, rc); +out: + /* if work has been added, signal the coordinator */ + if (rc == 0 || rc == -ENODATA) + mdt_hsm_cdt_event(cdt); + + return rc; } /** @@ -532,7 +578,7 @@ int mdt_hsm_get_actions(struct mdt_thread_info *mti, for (i = 0; i < hal->hal_count; i++, hai = hai_next(hai)) { struct cdt_agent_req *car; - car = mdt_cdt_find_request(cdt, hai->hai_cookie, NULL); + car = mdt_cdt_find_request(cdt, hai->hai_cookie); if (car == NULL) { hai->hai_cookie = 0; } else { @@ -550,4 +596,3 @@ int mdt_hsm_get_actions(struct mdt_thread_info *mti, RETURN(0); } -