X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fquota%2Fqmt_handler.c;h=9d2a4cca6fd61f2cbd639076dc330f5c57dceeae;hp=19e706f49d5dd503fc45e19680633fb975f9121c;hb=c1d0a355a6a64ec97c9f56c38ba036e5e50cd8c4;hpb=294aa9cb666c48e02da1057c222fe5f206ce38fc diff --git a/lustre/quota/qmt_handler.c b/lustre/quota/qmt_handler.c index 19e706f..9d2a4cc 100644 --- a/lustre/quota/qmt_handler.c +++ b/lustre/quota/qmt_handler.c @@ -21,213 +21,135 @@ * GPL HEADER END */ /* - * Copyright (c) 2012 Intel, Inc. + * Copyright (c) 2012, 2017, Intel Corporation. * Use is subject to license terms. * * Author: Johann Lombardi * Author: Niu Yawei */ -#ifndef EXPORT_SYMTAB -# define EXPORT_SYMTAB -#endif - #define DEBUG_SUBSYSTEM S_LQUOTA #include #include "qmt_internal.h" /* - * Fetch grace time for either inode or block. + * Retrieve quota settings for a given identifier. * * \param env - is the environment passed by the caller * \param qmt - is the quota master target - * \param pool_id - is the 16-bit pool identifier * \param restype - is the pool type, either block (i.e. LQUOTA_RES_DT) or inode * (i.e. LQUOTA_RES_MD) * \param qtype - is the quota type + * \param id - is the quota indentifier for which we want to acces quota + * settings. + * \param hard - is the output variable where to copy the hard limit + * \param soft - is the output variable where to copy the soft limit * \param time - is the output variable where to copy the grace time */ -static int qmt_getinfo(const struct lu_env *env, struct qmt_device *qmt, - __u16 pool_id, __u8 restype, __u8 qtype, __u64 *time) +static int qmt_get(const struct lu_env *env, struct qmt_device *qmt, + __u8 restype, __u8 qtype, union lquota_id *id, + __u64 *hard, __u64 *soft, __u64 *time, bool is_default) { - struct qmt_thread_info *qti = qmt_info(env); - union lquota_id *id = &qti->qti_id_bis; struct lquota_entry *lqe; ENTRY; - /* Global grace time is stored in quota settings of ID 0. */ - id->qid_uid = 0; + LASSERT(!is_default || id->qid_uid == 0); - /* look-up quota entry storing grace time */ - lqe = qmt_pool_lqe_lookup(env, qmt, pool_id, restype, qtype, id); + /* look-up lqe structure containing quota settings */ + lqe = qmt_pool_lqe_lookup(env, qmt, restype, qtype, id); if (IS_ERR(lqe)) RETURN(PTR_ERR(lqe)); + /* copy quota settings */ lqe_read_lock(lqe); - LQUOTA_DEBUG(lqe, "getinfo"); - /* copy grace time */ - *time = lqe->lqe_gracetime; + LQUOTA_DEBUG(lqe, "fetch settings"); + if (hard != NULL) + *hard = lqe->lqe_hardlimit; + if (soft != NULL) + *soft = lqe->lqe_softlimit; + if (time != NULL) { + *time = lqe->lqe_gracetime; + if (lqe->lqe_is_default) + *time |= (__u64)LQUOTA_FLAG_DEFAULT << + LQUOTA_GRACE_BITS; + } lqe_read_unlock(lqe); lqe_putref(lqe); RETURN(0); } -/* - * Update grace time for either inode or block. - * Global grace time is stored in quota settings of ID 0. - * - * \param env - is the environment passed by the caller - * \param qmt - is the quota master target - * \param pool_id - is the 16-bit pool identifier - * \param restype - is the pool type, either block (i.e. LQUOTA_RES_DT) or inode - * (i.e. LQUOTA_RES_MD) - * \param qtype - is the quota type - * \param time - is the new grace time - */ -static int qmt_setinfo(const struct lu_env *env, struct qmt_device *qmt, - __u16 pool_id, __u8 restype, __u8 qtype, __u64 time) -{ - struct qmt_thread_info *qti = qmt_info(env); - union lquota_id *id = &qti->qti_id_bis; - struct lquota_entry *lqe; - struct thandle *th = NULL; - int rc; - ENTRY; - - /* Global grace time is stored in quota settings of ID 0. */ - id->qid_uid = 0; - - /* look-up quota entry storing the global grace time */ - lqe = qmt_pool_lqe_lookup(env, qmt, pool_id, restype, qtype, id); - if (IS_ERR(lqe)) - RETURN(PTR_ERR(lqe)); - - /* allocate & start transaction with enough credits to update grace - * time in the global index file */ - th = qmt_trans_start(env, lqe, &qti->qti_restore); - if (IS_ERR(th)) - GOTO(out_nolock, rc = PTR_ERR(th)); - - /* write lock quota entry storing the grace time */ - lqe_write_lock(lqe); - if (lqe->lqe_gracetime == time) - /* grace time is the same */ - GOTO(out, rc = 0); - - LQUOTA_DEBUG(lqe, "setinfo time:"LPU64, time); - - /* set new grace time */ - lqe->lqe_gracetime = time; - /* always set enforced bit for ID 0 to make sure it does not go away */ - lqe->lqe_enforced = true; - - /* write new grace time to disk, no need for version bump */ - rc = qmt_glb_write(env, th, lqe, 0, NULL); - if (rc) { - /* restore initial grace time */ - qmt_restore(lqe, &qti->qti_restore); - GOTO(out, rc); - } - EXIT; -out: - lqe_write_unlock(lqe); -out_nolock: - lqe_putref(lqe); - if (th != NULL && !IS_ERR(th)) - dt_trans_stop(env, qmt->qmt_child, th); - return rc; -} +struct qmt_entry_iter_data { + const struct lu_env *qeid_env; + struct qmt_device *qeid_qmt; +}; -/* - * Retrieve quota settings for a given identifier. - * - * \param env - is the environment passed by the caller - * \param qmt - is the quota master target - * \param pool_id - is the 16-bit pool identifier - * \param restype - is the pool type, either block (i.e. LQUOTA_RES_DT) or inode - * (i.e. LQUOTA_RES_MD) - * \param qtype - is the quota type - * \param id - is the quota indentifier for which we want to acces quota - * settings. - * \param hard - is the output variable where to copy the hard limit - * \param soft - is the output variable where to copy the soft limit - * \param time - is the output variable where to copy the grace time - */ -static int qmt_getquota(const struct lu_env *env, struct qmt_device *qmt, - __u16 pool_id, __u8 restype, __u8 qtype, - union lquota_id *id, __u64 *hard, __u64 *soft, - __u64 *time) +static int qmt_entry_iter_cb(struct cfs_hash *hs, struct cfs_hash_bd *bd, + struct hlist_node *hnode, void *d) { + struct qmt_entry_iter_data *iter = (struct qmt_entry_iter_data *)d; struct lquota_entry *lqe; - ENTRY; - /* look-up lqe structure containing quota settings */ - lqe = qmt_pool_lqe_lookup(env, qmt, pool_id, restype, qtype, id); - if (IS_ERR(lqe)) - RETURN(PTR_ERR(lqe)); + lqe = hlist_entry(hnode, struct lquota_entry, lqe_hash); + LASSERT(atomic_read(&lqe->lqe_ref) > 0); - /* copy quota settings */ - lqe_read_lock(lqe); - LQUOTA_DEBUG(lqe, "getquota"); - *hard = lqe->lqe_hardlimit; - *soft = lqe->lqe_softlimit; - *time = lqe->lqe_gracetime; - lqe_read_unlock(lqe); + if (lqe->lqe_id.qid_uid == 0 || !lqe->lqe_is_default) + return 0; - lqe_putref(lqe); - RETURN(0); + return qmt_set_with_lqe(iter->qeid_env, iter->qeid_qmt, lqe, 0, 0, 0, 0, + true, true); } /* - * Update quota settings for a given identifier. + * Update quota settings for a given lqe. * - * \param env - is the environment passed by the caller - * \param qmt - is the quota master target - * \param pool_id - is the 16-bit pool identifier - * \param restype - is the pool type, either block (i.e. LQUOTA_RES_DT) or inode - * (i.e. LQUOTA_RES_MD) - * \param qtype - is the quota type - * \param id - is the quota indentifier for which we want to modify quota - * settings. - * \param hard - is the new hard limit - * \param soft - is the new soft limit - * \param time - is the new grace time - * \param valid - is the list of settings to change + * \param env - is the environment passed by the caller + * \param qmt - is the quota master target + * \param lqe - is the lquota_entry for which we want to modify quota + * settings. + * \param hard - is the new hard limit + * \param soft - is the new soft limit + * \param time - is the new grace time + * \param valid - is the list of settings to change + * \param is_default - true for default quota setting + * \param is_updated - true if the lqe is updated and no need to write back */ -static int qmt_setquota(const struct lu_env *env, struct qmt_device *qmt, - __u16 pool_id, __u8 restype, __u8 qtype, - union lquota_id *id, __u64 hard, __u64 soft, __u64 time, - __u32 valid) + +int qmt_set_with_lqe(const struct lu_env *env, struct qmt_device *qmt, + struct lquota_entry *lqe, __u64 hard, __u64 soft, + __u64 time, __u32 valid, bool is_default, bool is_updated) { struct qmt_thread_info *qti = qmt_info(env); - struct lquota_entry *lqe; struct thandle *th = NULL; - __u64 grace, ver; - bool dirtied = false, bump_version = false; + time64_t now; + __u64 ver; + bool dirtied = false; int rc = 0; ENTRY; - /* fetch global grace time */ - rc = qmt_getinfo(env, qmt, pool_id, restype, qtype, &grace); - if (rc) - RETURN(rc); - - /* look-up quota entry associated with this ID */ - lqe = qmt_pool_lqe_lookup(env, qmt, pool_id, restype, qtype, id); - if (IS_ERR(lqe)) - RETURN(PTR_ERR(lqe)); + /* need to write back to global quota file? */ + if (!is_updated) { + /* allocate & start transaction with enough credits to update + * quota settings in the global index file */ + th = qmt_trans_start(env, lqe, &qti->qti_restore); + if (IS_ERR(th)) + GOTO(out_nolock, rc = PTR_ERR(th)); + } - /* allocate & start transaction with enough credits to update quota - * settings in the global index file */ - th = qmt_trans_start(env, lqe, &qti->qti_restore); - if (IS_ERR(th)) - GOTO(out_nolock, rc = PTR_ERR(th)); + now = ktime_get_real_seconds(); lqe_write_lock(lqe); - LQUOTA_DEBUG(lqe, "setquota valid:%x hard:"LPU64" soft:"LPU64 - " time:"LPU64, valid, hard, soft, time); + LQUOTA_DEBUG(lqe, "changing quota settings valid:%x hard:%llu soft:" + "%llu time:%llu", valid, hard, soft, time); + + if (is_default && lqe->lqe_id.qid_uid != 0) { + LQUOTA_DEBUG(lqe, "set qid %llu to use default quota setting", + lqe->lqe_id.qid_uid); + + qmt_lqe_set_default(env, lqe->lqe_site->lqs_parent, lqe, false); + GOTO(quota_set, 0); + } if ((valid & QIF_TIMES) != 0 && lqe->lqe_gracetime != time) { /* change time settings */ @@ -237,8 +159,6 @@ static int qmt_setquota(const struct lu_env *env, struct qmt_device *qmt, if ((valid & QIF_LIMITS) != 0 && (lqe->lqe_hardlimit != hard || lqe->lqe_softlimit != soft)) { - bool enforced = lqe->lqe_enforced; - rc = qmt_validate_limits(lqe, hard, soft); if (rc) GOTO(out, rc); @@ -247,6 +167,10 @@ static int qmt_setquota(const struct lu_env *env, struct qmt_device *qmt, lqe->lqe_hardlimit = hard; lqe->lqe_softlimit = soft; +quota_set: + /* recompute qunit in case it was never initialized */ + qmt_revalidate(env, lqe); + /* clear grace time */ if (lqe->lqe_softlimit == 0 || lqe->lqe_granted <= lqe->lqe_softlimit) @@ -255,46 +179,110 @@ static int qmt_setquota(const struct lu_env *env, struct qmt_device *qmt, lqe->lqe_gracetime = 0; else if ((valid & QIF_TIMES) == 0) /* set grace only if user hasn't provided his own */ - lqe->lqe_gracetime = cfs_time_current_sec() + grace; + lqe->lqe_gracetime = now + qmt_lqe_grace(lqe); /* change enforced status based on new parameters */ - if (lqe->lqe_hardlimit == 0 && lqe->lqe_softlimit == 0) + if (lqe->lqe_id.qid_uid == 0 || (lqe->lqe_hardlimit == 0 && + lqe->lqe_softlimit == 0)) lqe->lqe_enforced = false; else lqe->lqe_enforced = true; - if ((enforced && !lqe->lqe_enforced) || - (!enforced && lqe->lqe_enforced)) - /* if enforced status has changed, we need to inform - * slave, therefore we need to bump the version */ - bump_version = true; + dirtied = true; + } + + if (!is_default && lqe->lqe_is_default) { + LQUOTA_DEBUG(lqe, "the qid %llu has been set quota" + " explicitly, clear the default flag", + lqe->lqe_id.qid_uid); + qmt_lqe_clear_default(lqe); dirtied = true; } if (dirtied) { - /* write new quota settings to disk */ - rc = qmt_glb_write(env, th, lqe, - bump_version ? LQUOTA_BUMP_VER : 0, &ver); - if (rc) { - /* restore initial quota settings */ - qmt_restore(lqe, &qti->qti_restore); - GOTO(out, rc); + if (!is_updated) { + /* write new quota settings to disk */ + rc = qmt_glb_write(env, th, lqe, LQUOTA_BUMP_VER, &ver); + if (rc) { + /* restore initial quota settings */ + qmt_restore(lqe, &qti->qti_restore); + GOTO(out, rc); + } + } else { + ver = dt_version_get(env, LQE_GLB_OBJ(lqe)); } + + /* compute new qunit value now that we have modified the quota + * settings */ + qmt_adjust_qunit(env, lqe); + + /* clear/set edquot flag as needed */ + qmt_adjust_edquot(lqe, now); } EXIT; out: lqe_write_unlock(lqe); -out_nolock: - lqe_putref(lqe); +out_nolock: if (th != NULL && !IS_ERR(th)) dt_trans_stop(env, qmt->qmt_child, th); + if (rc == 0 && dirtied) { + qmt_glb_lock_notify(env, lqe, ver); + if (lqe->lqe_id.qid_uid == 0) { + struct qmt_entry_iter_data iter_data; + + LQUOTA_DEBUG(lqe, "notify all lqe with default quota"); + iter_data.qeid_env = env; + iter_data.qeid_qmt = qmt; + cfs_hash_for_each_safe(lqe->lqe_site->lqs_hash, + qmt_entry_iter_cb, &iter_data); + } + } + return rc; } /* + * Update quota settings for a given identifier. + * + * \param env - is the environment passed by the caller + * \param qmt - is the quota master target + * \param restype - is the pool type, either block (i.e. LQUOTA_RES_DT) or + * inode (i.e. LQUOTA_RES_MD) + * \param qtype - is the quota type + * \param id - is the quota indentifier for which we want to modify + * quota settings. + * \param hard - is the new hard limit + * \param soft - is the new soft limit + * \param time - is the new grace time + * \param valid - is the list of settings to change + * \param is_default - true for default quota setting + * \param is_updated - true if the lqe is updated and no need to write back + */ +static int qmt_set(const struct lu_env *env, struct qmt_device *qmt, + __u8 restype, __u8 qtype, union lquota_id *id, + __u64 hard, __u64 soft, __u64 time, __u32 valid, + bool is_default, bool is_updated) +{ + struct lquota_entry *lqe; + int rc; + ENTRY; + + /* look-up quota entry associated with this ID */ + lqe = qmt_pool_lqe_lookup(env, qmt, restype, qtype, id); + if (IS_ERR(lqe)) + RETURN(PTR_ERR(lqe)); + + rc = qmt_set_with_lqe(env, qmt, lqe, hard, soft, time, valid, + is_default, is_updated); + + lqe_putref(lqe); + RETURN(rc); +} + +/* * Handle quotactl request. * * \param env - is the environment passed by the caller @@ -309,62 +297,68 @@ static int qmt_quotactl(const struct lu_env *env, struct lu_device *ld, struct qmt_device *qmt = lu2qmt_dev(ld); struct obd_dqblk *dqb = &oqctl->qc_dqblk; int rc = 0; + bool is_default = false; ENTRY; LASSERT(qmt != NULL); - if (oqctl->qc_type >= MAXQUOTAS) + if (oqctl->qc_type >= LL_MAXQUOTAS) /* invalid quota type */ RETURN(-EINVAL); switch (oqctl->qc_cmd) { case Q_GETINFO: /* read grace times */ + /* Global grace time is stored in quota settings of ID 0. */ + id->qid_uid = 0; + /* read inode grace time */ - rc = qmt_getinfo(env, qmt, 0, LQUOTA_RES_MD, oqctl->qc_type, - &oqctl->qc_dqinfo.dqi_igrace); + rc = qmt_get(env, qmt, LQUOTA_RES_MD, oqctl->qc_type, id, + NULL, NULL, &oqctl->qc_dqinfo.dqi_igrace, false); if (rc) break; /* read block grace time */ - rc = qmt_getinfo(env, qmt, 0, LQUOTA_RES_DT, oqctl->qc_type, - &oqctl->qc_dqinfo.dqi_bgrace); + rc = qmt_get(env, qmt, LQUOTA_RES_DT, oqctl->qc_type, id, + NULL, NULL, &oqctl->qc_dqinfo.dqi_bgrace, false); break; case Q_SETINFO: /* modify grace times */ /* setinfo should be using dqi->dqi_valid, but lfs incorrectly * sets the valid flags in dqb->dqb_valid instead, try to live * with that ... */ + + /* Global grace time is stored in quota settings of ID 0. */ + id->qid_uid = 0; + if ((dqb->dqb_valid & QIF_ITIME) != 0) { /* set inode grace time */ - rc = qmt_setinfo(env, qmt, 0, LQUOTA_RES_MD, - oqctl->qc_type, - oqctl->qc_dqinfo.dqi_igrace); + rc = qmt_set(env, qmt, LQUOTA_RES_MD, oqctl->qc_type, + id, 0, 0, oqctl->qc_dqinfo.dqi_igrace, + QIF_TIMES, false, false); if (rc) break; } if ((dqb->dqb_valid & QIF_BTIME) != 0) /* set block grace time */ - rc = qmt_setinfo(env, qmt, 0, LQUOTA_RES_DT, - oqctl->qc_type, - oqctl->qc_dqinfo.dqi_bgrace); + rc = qmt_set(env, qmt, LQUOTA_RES_DT, oqctl->qc_type, + id, 0, 0, oqctl->qc_dqinfo.dqi_bgrace, + QIF_TIMES, false, false); break; + case LUSTRE_Q_GETDEFAULT: + is_default = true; + /* fallthrough */ + case Q_GETQUOTA: /* consult quota limit */ - /* There is no quota limit for root user & group */ - if (oqctl->qc_id == 0) { - memset(dqb, 0, sizeof(*dqb)); - dqb->dqb_valid = QIF_LIMITS | QIF_TIMES; - break; - } /* extract quota ID from quotactl request */ id->qid_uid = oqctl->qc_id; /* look-up inode quota settings */ - rc = qmt_getquota(env, qmt, 0, LQUOTA_RES_MD, oqctl->qc_type, - id, &dqb->dqb_ihardlimit, - &dqb->dqb_isoftlimit, &dqb->dqb_itime); + rc = qmt_get(env, qmt, LQUOTA_RES_MD, oqctl->qc_type, id, + &dqb->dqb_ihardlimit, &dqb->dqb_isoftlimit, + &dqb->dqb_itime, is_default); if (rc) break; @@ -373,9 +367,9 @@ static int qmt_quotactl(const struct lu_env *env, struct lu_device *ld, dqb->dqb_curinodes = 0; /* look-up block quota settings */ - rc = qmt_getquota(env, qmt, 0, LQUOTA_RES_DT, oqctl->qc_type, - id, &dqb->dqb_bhardlimit, - &dqb->dqb_bsoftlimit, &dqb->dqb_btime); + rc = qmt_get(env, qmt, LQUOTA_RES_DT, oqctl->qc_type, id, + &dqb->dqb_bhardlimit, &dqb->dqb_bsoftlimit, + &dqb->dqb_btime, is_default); if (rc) break; @@ -384,40 +378,34 @@ static int qmt_quotactl(const struct lu_env *env, struct lu_device *ld, dqb->dqb_curspace = 0; break; + case LUSTRE_Q_SETDEFAULT: + is_default = true; + /* fallthrough */ + case Q_SETQUOTA: /* change quota limits */ - if (oqctl->qc_id == 0) - /* can't enforce a quota limit for root user & group */ - RETURN(-EPERM); /* extract quota ID from quotactl request */ id->qid_uid = oqctl->qc_id; if ((dqb->dqb_valid & QIF_IFLAGS) != 0) { /* update inode quota settings */ - rc = qmt_setquota(env, qmt, 0, LQUOTA_RES_MD, - oqctl->qc_type, id, - dqb->dqb_ihardlimit, - dqb->dqb_isoftlimit, dqb->dqb_itime, - dqb->dqb_valid & QIF_IFLAGS); + rc = qmt_set(env, qmt, LQUOTA_RES_MD, oqctl->qc_type, + id, dqb->dqb_ihardlimit, + dqb->dqb_isoftlimit, dqb->dqb_itime, + dqb->dqb_valid & QIF_IFLAGS, is_default, + false); if (rc) break; } if ((dqb->dqb_valid & QIF_BFLAGS) != 0) /* update block quota settings */ - rc = qmt_setquota(env, qmt, 0, LQUOTA_RES_DT, - oqctl->qc_type, id, - dqb->dqb_bhardlimit, - dqb->dqb_bsoftlimit, dqb->dqb_btime, - dqb->dqb_valid & QIF_BFLAGS); + rc = qmt_set(env, qmt, LQUOTA_RES_DT, oqctl->qc_type, + id, dqb->dqb_bhardlimit, + dqb->dqb_bsoftlimit, dqb->dqb_btime, + dqb->dqb_valid & QIF_BFLAGS, is_default, + false); break; - case Q_QUOTAON: - case Q_QUOTAOFF: /* quota is always turned on on the master */ - RETURN(0); - - case LUSTRE_Q_INVALIDATE: /* not supported any more */ - RETURN(-ENOTSUPP); - default: CERROR("%s: unsupported quotactl command: %d\n", qmt->qmt_svname, oqctl->qc_cmd); @@ -428,6 +416,255 @@ static int qmt_quotactl(const struct lu_env *env, struct lu_device *ld, } /* + * Helper function to handle quota request from slave. + * + * \param env - is the environment passed by the caller + * \param lqe - is the lquota_entry subject to the quota request + * \param qmt - is the master device + * \param uuid - is the uuid associated with the slave + * \param qb_flags - are the quota request flags as packed in the quota_body + * \param qb_count - is the amount of quota space the slave wants to + * acquire/release + * \param qb_usage - is the current space usage on the slave + * \param repbody - is the quota_body of reply + * + * \retval 0 : success + * \retval -EDQUOT : out of quota + * -EINPROGRESS : inform client to retry write/create + * -ve : other appropriate errors + */ +int qmt_dqacq0(const struct lu_env *env, struct lquota_entry *lqe, + struct qmt_device *qmt, struct obd_uuid *uuid, __u32 qb_flags, + __u64 qb_count, __u64 qb_usage, struct quota_body *repbody) +{ + struct qmt_thread_info *qti = qmt_info(env); + __u64 now, count; + struct dt_object *slv_obj = NULL; + __u64 slv_granted, slv_granted_bck; + struct thandle *th = NULL; + int rc, ret; + ENTRY; + + LASSERT(uuid != NULL); + + /* initialize reply */ + memset(repbody, 0, sizeof(*repbody)); + memcpy(&repbody->qb_id, &lqe->lqe_id, sizeof(repbody->qb_id)); + + if (OBD_FAIL_CHECK(OBD_FAIL_QUOTA_RECOVERABLE_ERR)) + RETURN(-cfs_fail_val); + + /* look-up index file associated with acquiring slave */ + slv_obj = lquota_disk_slv_find(env, qmt->qmt_child, LQE_ROOT(lqe), + lu_object_fid(&LQE_GLB_OBJ(lqe)->do_lu), + uuid); + if (IS_ERR(slv_obj)) + GOTO(out, rc = PTR_ERR(slv_obj)); + + /* pack slave fid in reply just for sanity check */ + memcpy(&repbody->qb_slv_fid, lu_object_fid(&slv_obj->do_lu), + sizeof(struct lu_fid)); + + /* allocate & start transaction with enough credits to update + * global & slave indexes */ + th = qmt_trans_start_with_slv(env, lqe, slv_obj, &qti->qti_restore); + if (IS_ERR(th)) + GOTO(out, rc = PTR_ERR(th)); + + lqe_write_lock(lqe); + LQUOTA_DEBUG(lqe, "dqacq starts uuid:%s flags:0x%x wanted:%llu" + " usage:%llu", obd_uuid2str(uuid), qb_flags, qb_count, + qb_usage); + + /* Legal race, limits have been removed on master, but slave didn't + * receive the change yet. Just return EINPROGRESS until the slave gets + * notified. */ + if (!lqe->lqe_enforced && !req_is_rel(qb_flags)) + GOTO(out_locked, rc = -ESRCH); + + /* recompute qunit in case it was never initialized */ + qmt_revalidate(env, lqe); + + /* slave just wants to acquire per-ID lock */ + if (req_is_acq(qb_flags) && qb_count == 0) + GOTO(out_locked, rc = 0); + + /* fetch how much quota space is already granted to this slave */ + rc = qmt_slv_read(env, lqe, slv_obj, &slv_granted); + if (rc) { + LQUOTA_ERROR(lqe, "Failed to get granted for slave %s, rc=%d", + obd_uuid2str(uuid), rc); + GOTO(out_locked, rc); + } + /* recall how much space this slave currently owns in order to restore + * it in case of failure */ + slv_granted_bck = slv_granted; + + /* record current time for soft limit & grace time management */ + now = ktime_get_real_seconds(); + + if (req_is_rel(qb_flags)) { + /* Slave would like to release quota space */ + if (slv_granted < qb_count || + lqe->lqe_granted < qb_count) { + /* can't release more than granted */ + LQUOTA_ERROR(lqe, "Release too much! uuid:%s release:" + "%llu granted:%llu, total:%llu", + obd_uuid2str(uuid), qb_count, + slv_granted, lqe->lqe_granted); + GOTO(out_locked, rc = -EINVAL); + } + + repbody->qb_count = qb_count; + /* put released space back to global pool */ + QMT_REL(lqe, slv_granted, qb_count); + GOTO(out_write, rc = 0); + } + + if (req_has_rep(qb_flags) && slv_granted < qb_usage) { + /* Slave is reporting space usage in quota request and it turns + * out to be using more quota space than owned, so we adjust + * granted space regardless of the current state of affairs */ + repbody->qb_count = qb_usage - slv_granted; + QMT_GRANT(lqe, slv_granted, repbody->qb_count); + } + + if (!req_is_acq(qb_flags) && !req_is_preacq(qb_flags)) + GOTO(out_write, rc = 0); + + qmt_adjust_edquot(lqe, now); + if (lqe->lqe_edquot) + /* no hope to claim further space back */ + GOTO(out_write, rc = -EDQUOT); + + if (qmt_space_exhausted(lqe, now)) { + /* might have some free space once rebalancing is completed */ + rc = req_is_acq(qb_flags) ? -EINPROGRESS : -EDQUOT; + GOTO(out_write, rc); + } + + if (req_is_preacq(qb_flags)) { + /* slave would like to pre-acquire quota space. To do so, it + * reports in qb_count how much spare quota space it owns and we + * can grant back quota space which is consistent with qunit + * value. */ + + if (qb_count >= lqe->lqe_qunit) + /* slave already own the maximum it should */ + GOTO(out_write, rc = 0); + + count = qmt_alloc_expand(lqe, slv_granted, qb_count); + if (count == 0) + GOTO(out_write, rc = -EDQUOT); + + repbody->qb_count += count; + QMT_GRANT(lqe, slv_granted, count); + GOTO(out_write, rc = 0); + } + + /* processing acquire request with clients waiting */ + if (lqe->lqe_hardlimit != 0 && + lqe->lqe_granted + qb_count > lqe->lqe_hardlimit) { + /* cannot grant as much as asked, but can still afford to grant + * some quota space back */ + count = lqe->lqe_hardlimit - lqe->lqe_granted; + repbody->qb_count += count; + QMT_GRANT(lqe, slv_granted, count); + GOTO(out_write, rc = 0); + } + + /* Whouhou! we can satisfy the slave request! */ + repbody->qb_count += qb_count; + QMT_GRANT(lqe, slv_granted, qb_count); + + /* Try to expand the acquired count for DQACQ */ + count = qmt_alloc_expand(lqe, slv_granted, 0); + if (count != 0) { + /* can even grant more than asked, it is like xmas ... */ + repbody->qb_count += count; + QMT_GRANT(lqe, slv_granted, count); + GOTO(out_write, rc = 0); + } + + GOTO(out_write, rc = 0); +out_write: + if (repbody->qb_count == 0) + GOTO(out_locked, rc); + + /* start/stop grace timer if required */ + if (lqe->lqe_softlimit != 0) { + if (lqe->lqe_granted > lqe->lqe_softlimit && + lqe->lqe_gracetime == 0) + /* first time over soft limit, let's start grace + * timer */ + lqe->lqe_gracetime = now + qmt_lqe_grace(lqe); + else if (lqe->lqe_granted <= lqe->lqe_softlimit && + lqe->lqe_gracetime != 0) + /* Clear grace timer */ + lqe->lqe_gracetime = 0; + } + + /* Update slave index first since it is easier to roll back */ + ret = qmt_slv_write(env, th, lqe, slv_obj, LQUOTA_BUMP_VER, + &repbody->qb_slv_ver, slv_granted); + if (ret) { + /* restore initial quota settings */ + qmt_restore(lqe, &qti->qti_restore); + /* reset qb_count */ + repbody->qb_count = 0; + GOTO(out_locked, rc = ret); + } + + /* Update global index, no version bump needed */ + ret = qmt_glb_write(env, th, lqe, 0, NULL); + if (ret) { + rc = ret; + /* restore initial quota settings */ + qmt_restore(lqe, &qti->qti_restore); + /* reset qb_count */ + repbody->qb_count = 0; + + /* restore previous granted value */ + ret = qmt_slv_write(env, th, lqe, slv_obj, 0, NULL, + slv_granted_bck); + if (ret) { + LQUOTA_ERROR(lqe, "failed to restore initial slave " + "value rc:%d ret%d", rc, ret); + LBUG(); + } + qmt_adjust_edquot(lqe, now); + GOTO(out_locked, rc); + } + + /* Total granted has been changed, let's try to adjust the qunit + * size according to the total granted & limits. */ + qmt_adjust_qunit(env, lqe); + + /* clear/set edquot flag and notify slaves via glimpse if needed */ + qmt_adjust_edquot(lqe, now); +out_locked: + LQUOTA_DEBUG(lqe, "dqacq ends count:%llu ver:%llu rc:%d", + repbody->qb_count, repbody->qb_slv_ver, rc); + lqe_write_unlock(lqe); +out: + if (th != NULL && !IS_ERR(th)) + dt_trans_stop(env, qmt->qmt_child, th); + + if (slv_obj != NULL && !IS_ERR(slv_obj)) + dt_object_put(env, slv_obj); + + if ((req_is_acq(qb_flags) || req_is_preacq(qb_flags)) && + OBD_FAIL_CHECK(OBD_FAIL_QUOTA_EDQUOT)) { + /* introduce inconsistency between granted value in slave index + * and slave index copy of slave */ + repbody->qb_count = 0; + rc = -EDQUOT; + } + + RETURN(rc); +} + +/* * Handle quota request from slave. * * \param env - is the environment passed by the caller @@ -437,7 +674,13 @@ static int qmt_quotactl(const struct lu_env *env, struct lu_device *ld, static int qmt_dqacq(const struct lu_env *env, struct lu_device *ld, struct ptlrpc_request *req) { + struct qmt_device *qmt = lu2qmt_dev(ld); struct quota_body *qbody, *repbody; + struct obd_uuid *uuid; + struct ldlm_lock *lock; + struct lquota_entry *lqe; + int pool_type, qtype; + int rc; ENTRY; qbody = req_capsule_client_get(&req->rq_pill, &RMF_QUOTA_BODY); @@ -448,9 +691,82 @@ static int qmt_dqacq(const struct lu_env *env, struct lu_device *ld, if (repbody == NULL) RETURN(err_serious(-EFAULT)); - /* XXX: to be implemented */ + /* verify if global lock is stale */ + if (!lustre_handle_is_used(&qbody->qb_glb_lockh)) + RETURN(-ENOLCK); - RETURN(0); + lock = ldlm_handle2lock(&qbody->qb_glb_lockh); + if (lock == NULL) + RETURN(-ENOLCK); + LDLM_LOCK_PUT(lock); + + uuid = &req->rq_export->exp_client_uuid; + + if (req_is_rel(qbody->qb_flags) + req_is_acq(qbody->qb_flags) + + req_is_preacq(qbody->qb_flags) > 1) { + CERROR("%s: malformed quota request with conflicting flags set " + "(%x) from slave %s\n", qmt->qmt_svname, + qbody->qb_flags, obd_uuid2str(uuid)); + RETURN(-EPROTO); + } + + if (req_is_acq(qbody->qb_flags) || req_is_preacq(qbody->qb_flags)) { + /* acquire and pre-acquire should use a valid ID lock */ + + if (!lustre_handle_is_used(&qbody->qb_lockh)) + RETURN(-ENOLCK); + + lock = ldlm_handle2lock(&qbody->qb_lockh); + if (lock == NULL) + /* no lock associated with this handle */ + RETURN(-ENOLCK); + + LDLM_DEBUG(lock, "%sacquire request", + req_is_preacq(qbody->qb_flags) ? "pre" : ""); + + if (!obd_uuid_equals(&lock->l_export->exp_client_uuid, uuid)) { + /* sorry, no way to cheat ... */ + LDLM_LOCK_PUT(lock); + RETURN(-ENOLCK); + } + + if (ldlm_is_ast_sent(lock)) { + struct ptlrpc_service_part *svc; + time64_t timeout; + + svc = req->rq_rqbd->rqbd_svcpt; + timeout = at_est2timeout(at_get(&svc->scp_at_estimate)); + timeout += (ldlm_bl_timeout(lock) >> 1); + + /* lock is being cancelled, prolong timeout */ + ldlm_refresh_waiting_lock(lock, timeout); + } + LDLM_LOCK_PUT(lock); + } + + /* extract quota information from global index FID packed in the + * request */ + rc = lquota_extract_fid(&qbody->qb_fid, &pool_type, &qtype); + if (rc) + RETURN(-EINVAL); + + /* Find the quota entry associated with the quota id */ + lqe = qmt_pool_lqe_lookup(env, qmt, pool_type, qtype, + &qbody->qb_id); + if (IS_ERR(lqe)) + RETURN(PTR_ERR(lqe)); + + /* process quota request */ + rc = qmt_dqacq0(env, lqe, qmt, uuid, qbody->qb_flags, qbody->qb_count, + qbody->qb_usage, repbody); + + if (lustre_handle_is_used(&qbody->qb_lockh)) + /* return current qunit value only to slaves owning an per-ID + * quota lock. For enqueue, the qunit value will be returned in + * the LVB */ + repbody->qb_qunit = lqe->lqe_qunit; + lqe_putref(lqe); + RETURN(rc); } /* Vector of quota request handlers. This vector is used by the MDT to forward