X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fquota%2Fqmt_handler.c;h=5e7f2f5e5a3fcd2d88cc47dd8b71e59390ff8701;hb=refs%2Fchanges%2F73%2F39873%2F18;hp=dc3f280dbff922177b8f354bf487c9c006f193bb;hpb=51435cfffcd6815e70fb46b0ec2edcac3327bf44;p=fs%2Flustre-release.git diff --git a/lustre/quota/qmt_handler.c b/lustre/quota/qmt_handler.c index dc3f280..5e7f2f5 100644 --- a/lustre/quota/qmt_handler.c +++ b/lustre/quota/qmt_handler.c @@ -21,210 +21,171 @@ * GPL HEADER END */ /* - * Copyright (c) 2012 Intel, Inc. + * Copyright (c) 2012, 2017, Intel Corporation. * Use is subject to license terms. * * Author: Johann Lombardi * Author: Niu Yawei */ -#ifndef EXPORT_SYMTAB -# define EXPORT_SYMTAB -#endif - #define DEBUG_SUBSYSTEM S_LQUOTA #include #include "qmt_internal.h" /* - * Fetch grace time for either inode or block. + * Retrieve quota settings for a given identifier. * * \param env - is the environment passed by the caller * \param qmt - is the quota master target - * \param pool_id - is the 16-bit pool identifier * \param restype - is the pool type, either block (i.e. LQUOTA_RES_DT) or inode * (i.e. LQUOTA_RES_MD) * \param qtype - is the quota type + * \param id - is the quota indentifier for which we want to acces quota + * settings. + * \param hard - is the output variable where to copy the hard limit + * \param soft - is the output variable where to copy the soft limit * \param time - is the output variable where to copy the grace time */ -static int qmt_getinfo(const struct lu_env *env, struct qmt_device *qmt, - __u16 pool_id, __u8 restype, __u8 qtype, __u64 *time) +static int qmt_get(const struct lu_env *env, struct qmt_device *qmt, + __u8 restype, __u8 qtype, union lquota_id *id, + __u64 *hard, __u64 *soft, __u64 *time, bool is_default, + char *pool_name) { - struct qmt_thread_info *qti = qmt_info(env); - union lquota_id *id = &qti->qti_id; struct lquota_entry *lqe; ENTRY; - /* Global grace time is stored in quota settings of ID 0. */ - id->qid_uid = 0; + LASSERT(!is_default || id->qid_uid == 0); + if (pool_name && !strnlen(pool_name, LOV_MAXPOOLNAME)) + pool_name = NULL; - /* look-up quota entry storing grace time */ - lqe = qmt_pool_lqe_lookup(env, qmt, pool_id, restype, qtype, id); + /* look-up lqe structure containing quota settings */ + lqe = qmt_pool_lqe_lookup(env, qmt, restype, qtype, id, pool_name); if (IS_ERR(lqe)) RETURN(PTR_ERR(lqe)); + /* copy quota settings */ lqe_read_lock(lqe); - LQUOTA_DEBUG(lqe, "getinfo"); - /* copy grace time */ - *time = lqe->lqe_gracetime; + LQUOTA_DEBUG(lqe, "fetch settings"); + if (hard != NULL) + *hard = lqe->lqe_hardlimit; + if (soft != NULL) + *soft = lqe->lqe_softlimit; + if (time != NULL) { + *time = lqe->lqe_gracetime; + if (lqe->lqe_is_default) + *time |= (__u64)LQUOTA_FLAG_DEFAULT << + LQUOTA_GRACE_BITS; + } lqe_read_unlock(lqe); lqe_putref(lqe); RETURN(0); } -/* - * Update grace time for either inode or block. - * Global grace time is stored in quota settings of ID 0. - * - * \param env - is the environment passed by the caller - * \param qmt - is the quota master target - * \param pool_id - is the 16-bit pool identifier - * \param restype - is the pool type, either block (i.e. LQUOTA_RES_DT) or inode - * (i.e. LQUOTA_RES_MD) - * \param qtype - is the quota type - * \param time - is the new grace time - */ -static int qmt_setinfo(const struct lu_env *env, struct qmt_device *qmt, - __u16 pool_id, __u8 restype, __u8 qtype, __u64 time) +struct qmt_entry_iter_data { + const struct lu_env *qeid_env; + struct qmt_device *qeid_qmt; +}; + +static int qmt_entry_iter_cb(struct cfs_hash *hs, struct cfs_hash_bd *bd, + struct hlist_node *hnode, void *d) { - struct qmt_thread_info *qti = qmt_info(env); - union lquota_id *id = &qti->qti_id; + struct qmt_entry_iter_data *iter = (struct qmt_entry_iter_data *)d; struct lquota_entry *lqe; - struct thandle *th = NULL; - int rc; - ENTRY; - - /* Global grace time is stored in quota settings of ID 0. */ - id->qid_uid = 0; - - /* look-up quota entry storing the global grace time */ - lqe = qmt_pool_lqe_lookup(env, qmt, pool_id, restype, qtype, id); - if (IS_ERR(lqe)) - RETURN(PTR_ERR(lqe)); - /* allocate & start transaction with enough credits to update grace - * time in the global index file */ - th = qmt_trans_start(env, lqe, &qti->qti_restore); - if (IS_ERR(th)) - GOTO(out_nolock, rc = PTR_ERR(th)); - - /* write lock quota entry storing the grace time */ - lqe_write_lock(lqe); - if (lqe->lqe_gracetime == time) - /* grace time is the same */ - GOTO(out, rc = 0); - - LQUOTA_DEBUG(lqe, "setinfo time:"LPU64, time); + lqe = hlist_entry(hnode, struct lquota_entry, lqe_hash); + LASSERT(atomic_read(&lqe->lqe_ref) > 0); - /* set new grace time */ - lqe->lqe_gracetime = time; - /* always set enforced bit for ID 0 to make sure it does not go away */ - lqe->lqe_enforced = true; + if (lqe->lqe_id.qid_uid == 0 || !lqe->lqe_is_default) + return 0; - /* write new grace time to disk, no need for version bump */ - rc = qmt_glb_write(env, th, lqe, 0, NULL); - if (rc) { - /* restore initial grace time */ - qmt_restore(lqe, &qti->qti_restore); - GOTO(out, rc); - } - EXIT; -out: - lqe_write_unlock(lqe); -out_nolock: - lqe_putref(lqe); - if (th != NULL && !IS_ERR(th)) - dt_trans_stop(env, qmt->qmt_child, th); - return rc; + return qmt_set_with_lqe(iter->qeid_env, iter->qeid_qmt, lqe, 0, 0, 0, 0, + true, true); } -/* - * Retrieve quota settings for a given identifier. - * - * \param env - is the environment passed by the caller - * \param qmt - is the quota master target - * \param pool_id - is the 16-bit pool identifier - * \param restype - is the pool type, either block (i.e. LQUOTA_RES_DT) or inode - * (i.e. LQUOTA_RES_MD) - * \param qtype - is the quota type - * \param id - is the quota indentifier for which we want to acces quota - * settings. - * \param hard - is the output variable where to copy the hard limit - * \param soft - is the output variable where to copy the soft limit - * \param time - is the output variable where to copy the grace time - */ -static int qmt_getquota(const struct lu_env *env, struct qmt_device *qmt, - __u16 pool_id, __u8 restype, __u8 qtype, - union lquota_id *id, __u64 *hard, __u64 *soft, - __u64 *time) +static void qmt_set_id_notify(const struct lu_env *env, struct qmt_device *qmt, + struct lquota_entry *lqe) { - struct lquota_entry *lqe; - ENTRY; - - /* look-up lqe structure containing quota settings */ - lqe = qmt_pool_lqe_lookup(env, qmt, pool_id, restype, qtype, id); - if (IS_ERR(lqe)) - RETURN(PTR_ERR(lqe)); - - /* copy quota settings */ - lqe_read_lock(lqe); - LQUOTA_DEBUG(lqe, "getquota"); - *hard = lqe->lqe_hardlimit; - *soft = lqe->lqe_softlimit; - *time = lqe->lqe_gracetime; - lqe_read_unlock(lqe); - - lqe_putref(lqe); - RETURN(0); + struct lquota_entry *lqe_gl; + int rc; + + lqe_gl = lqe->lqe_is_global ? lqe : NULL; + rc = qmt_pool_lqes_lookup_spec(env, qmt, lqe_rtype(lqe), + lqe_qtype(lqe), &lqe->lqe_id); + if (!qti_lqes_cnt(env)) + GOTO(lqes_fini, rc); + + if (!lqe_gl && qti_lqes_glbl(env)->lqe_is_global) + lqe_gl = qti_lqes_glbl(env); + + if (!lqe_gl) + GOTO(lqes_fini, rc); + + if (lqe_gl->lqe_glbl_data) + qmt_seed_glbe(env, lqe_gl->lqe_glbl_data); + /* Even if slaves haven't enqueued quota lock yet, + * it is needed to set lqe_revoke_time in qmt_id_lock_glimpse + * in case of reaching qpi_least_qunit */ + qmt_id_lock_notify(qmt, lqe_gl); +lqes_fini: + qti_lqes_fini(env); } /* - * Update quota settings for a given identifier. + * Update quota settings for a given lqe. * - * \param env - is the environment passed by the caller - * \param qmt - is the quota master target - * \param pool_id - is the 16-bit pool identifier - * \param restype - is the pool type, either block (i.e. LQUOTA_RES_DT) or inode - * (i.e. LQUOTA_RES_MD) - * \param qtype - is the quota type - * \param id - is the quota indentifier for which we want to modify quota - * settings. - * \param hard - is the new hard limit - * \param soft - is the new soft limit - * \param time - is the new grace time - * \param valid - is the list of settings to change + * \param env - is the environment passed by the caller + * \param qmt - is the quota master target + * \param lqe - is the lquota_entry for which we want to modify quota + * settings. + * \param hard - is the new hard limit + * \param soft - is the new soft limit + * \param time - is the new grace time + * \param valid - is the list of settings to change + * \param is_default - true for default quota setting + * \param is_updated - true if the lqe is updated and no need to write back */ -static int qmt_setquota(const struct lu_env *env, struct qmt_device *qmt, - __u16 pool_id, __u8 restype, __u8 qtype, - union lquota_id *id, __u64 hard, __u64 soft, __u64 time, - __u32 valid) + +int qmt_set_with_lqe(const struct lu_env *env, struct qmt_device *qmt, + struct lquota_entry *lqe, __u64 hard, __u64 soft, + __u64 time, __u32 valid, bool is_default, bool is_updated) { - struct qmt_thread_info *qti = qmt_info(env); - struct lquota_entry *lqe; - struct thandle *th = NULL; - __u64 ver, now; - bool dirtied = false, bump_version = false; - int rc = 0; + struct thandle *th = NULL; + time64_t now = 0; + __u64 ver; + bool dirtied = false; + int rc = 0; + bool need_id_notify = false; ENTRY; - /* look-up quota entry associated with this ID */ - lqe = qmt_pool_lqe_lookup(env, qmt, pool_id, restype, qtype, id); - if (IS_ERR(lqe)) - RETURN(PTR_ERR(lqe)); - - /* allocate & start transaction with enough credits to update quota - * settings in the global index file */ - th = qmt_trans_start(env, lqe, &qti->qti_restore); - if (IS_ERR(th)) - GOTO(out_nolock, rc = PTR_ERR(th)); + /* need to write back to global quota file? */ + if (!is_updated) { + /* By default we should have here only 1 lqe, + * so no allocations should be done. */ + if (qti_lqes_restore_init(env)) + GOTO(out_nolock, rc = -ENOMEM); + /* allocate & start transaction with enough credits to update + * quota settings in the global index file */ + th = qmt_trans_start(env, lqe); + if (IS_ERR(th)) + GOTO(out_nolock, rc = PTR_ERR(th)); + } - now = cfs_time_current_sec(); + now = ktime_get_real_seconds(); lqe_write_lock(lqe); - LQUOTA_DEBUG(lqe, "setquota valid:%x hard:"LPU64" soft:"LPU64 - " time:"LPU64, valid, hard, soft, time); + LQUOTA_DEBUG(lqe, + "changing quota settings valid:%x hard:%llu soft:%llu time:%llu", + valid, hard, soft, time); + + if (is_default && lqe->lqe_id.qid_uid != 0) { + LQUOTA_DEBUG(lqe, "set qid %llu to use default quota setting", + lqe->lqe_id.qid_uid); + + qmt_lqe_set_default(env, lqe->lqe_site->lqs_parent, lqe, false); + GOTO(quota_set, 0); + } if ((valid & QIF_TIMES) != 0 && lqe->lqe_gracetime != time) { /* change time settings */ @@ -234,19 +195,19 @@ static int qmt_setquota(const struct lu_env *env, struct qmt_device *qmt, if ((valid & QIF_LIMITS) != 0 && (lqe->lqe_hardlimit != hard || lqe->lqe_softlimit != soft)) { - bool enforced = lqe->lqe_enforced; - rc = qmt_validate_limits(lqe, hard, soft); if (rc) GOTO(out, rc); - /* recompute qunit in case it was never initialized */ - qmt_revalidate(env, lqe); - /* change quota limits */ lqe->lqe_hardlimit = hard; lqe->lqe_softlimit = soft; +quota_set: + /* recompute qunit in case it was never initialized */ + if (qmt_revalidate(env, lqe)) + need_id_notify = true; + /* clear grace time */ if (lqe->lqe_softlimit == 0 || lqe->lqe_granted <= lqe->lqe_softlimit) @@ -258,53 +219,114 @@ static int qmt_setquota(const struct lu_env *env, struct qmt_device *qmt, lqe->lqe_gracetime = now + qmt_lqe_grace(lqe); /* change enforced status based on new parameters */ - if (lqe->lqe_hardlimit == 0 && lqe->lqe_softlimit == 0) + if (lqe->lqe_id.qid_uid == 0 || (lqe->lqe_hardlimit == 0 && + lqe->lqe_softlimit == 0)) lqe->lqe_enforced = false; else lqe->lqe_enforced = true; - if ((enforced && !lqe->lqe_enforced) || - (!enforced && lqe->lqe_enforced)) - /* if enforced status has changed, we need to inform - * slave, therefore we need to bump the version */ - bump_version = true; + dirtied = true; + } + + if (!is_default && lqe->lqe_is_default) { + LQUOTA_DEBUG(lqe, "the qid %llu has been set quota" + " explicitly, clear the default flag", + lqe->lqe_id.qid_uid); + qmt_lqe_clear_default(lqe); dirtied = true; } if (dirtied) { - /* write new quota settings to disk */ - rc = qmt_glb_write(env, th, lqe, - bump_version ? LQUOTA_BUMP_VER : 0, &ver); - if (rc) { - /* restore initial quota settings */ - qmt_restore(lqe, &qti->qti_restore); - GOTO(out, rc); + if (!is_updated) { + /* write new quota settings to disk */ + rc = qmt_glb_write(env, th, lqe, LQUOTA_BUMP_VER, &ver); + if (rc) { + /* restore initial quota settings */ + qmt_restore(lqe, &qti_lqes_rstr(env)[0]); + GOTO(out, rc); + } + } else { + ver = dt_version_get(env, LQE_GLB_OBJ(lqe)); } /* compute new qunit value now that we have modified the quota - * settings */ - qmt_adjust_qunit(env, lqe); - - /* clear/set edquot flag as needed */ - qmt_adjust_edquot(lqe, now); + * settings or clear/set edquot flag if needed */ + need_id_notify |= qmt_adjust_qunit(env, lqe); + need_id_notify |= qmt_adjust_edquot(lqe, now); } EXIT; out: lqe_write_unlock(lqe); -out_nolock: - lqe_putref(lqe); +out_nolock: + qti_lqes_restore_fini(env); if (th != NULL && !IS_ERR(th)) dt_trans_stop(env, qmt->qmt_child, th); - if (rc == 0 && bump_version) + if (rc == 0 && dirtied) { qmt_glb_lock_notify(env, lqe, ver); + if (lqe->lqe_id.qid_uid == 0) { + struct qmt_entry_iter_data iter_data; + + LQUOTA_DEBUG(lqe, "notify all lqe with default quota"); + iter_data.qeid_env = env; + iter_data.qeid_qmt = qmt; + cfs_hash_for_each(lqe->lqe_site->lqs_hash, + qmt_entry_iter_cb, &iter_data); + /* Always notify slaves with default values. Don't + * care about overhead as will be sent only not changed + * values(see qmt_id_lock_cb for details).*/ + need_id_notify = true; + } + if (need_id_notify) + qmt_set_id_notify(env, qmt, lqe); + } return rc; } /* + * Update quota settings for a given identifier. + * + * \param env - is the environment passed by the caller + * \param qmt - is the quota master target + * \param restype - is the pool type, either block (i.e. LQUOTA_RES_DT) or + * inode (i.e. LQUOTA_RES_MD) + * \param qtype - is the quota type + * \param id - is the quota indentifier for which we want to modify + * quota settings. + * \param hard - is the new hard limit + * \param soft - is the new soft limit + * \param time - is the new grace time + * \param valid - is the list of settings to change + * \param is_default - true for default quota setting + * \param is_updated - true if the lqe is updated and no need to write back + */ +static int qmt_set(const struct lu_env *env, struct qmt_device *qmt, + __u8 restype, __u8 qtype, union lquota_id *id, + __u64 hard, __u64 soft, __u64 time, __u32 valid, + bool is_default, bool is_updated, char *pool_name) +{ + struct lquota_entry *lqe; + int rc; + ENTRY; + + if (pool_name && !strnlen(pool_name, LOV_MAXPOOLNAME)) + pool_name = NULL; + + /* look-up quota entry associated with this ID */ + lqe = qmt_pool_lqe_lookup(env, qmt, restype, qtype, id, pool_name); + if (IS_ERR(lqe)) + RETURN(PTR_ERR(lqe)); + + rc = qmt_set_with_lqe(env, qmt, lqe, hard, soft, time, valid, + is_default, is_updated); + lqe_putref(lqe); + RETURN(rc); +} + +/* * Handle quotactl request. * * \param env - is the environment passed by the caller @@ -314,78 +336,99 @@ out_nolock: static int qmt_quotactl(const struct lu_env *env, struct lu_device *ld, struct obd_quotactl *oqctl) { - struct qmt_thread_info *qti = qmt_info(env); - union lquota_id *id = &qti->qti_id; - struct qmt_device *qmt = lu2qmt_dev(ld); - struct obd_dqblk *dqb = &oqctl->qc_dqblk; - int rc = 0; + struct qmt_thread_info *qti = qmt_info(env); + union lquota_id *id = &qti->qti_id; + struct qmt_device *qmt = lu2qmt_dev(ld); + struct obd_dqblk *dqb = &oqctl->qc_dqblk; + char *poolname; + int rc = 0; + bool is_default = false; ENTRY; LASSERT(qmt != NULL); - if (oqctl->qc_type >= MAXQUOTAS) + if (oqctl->qc_type >= LL_MAXQUOTAS) /* invalid quota type */ RETURN(-EINVAL); + poolname = LUSTRE_Q_CMD_IS_POOL(oqctl->qc_cmd) ? + oqctl->qc_poolname : NULL; + switch (oqctl->qc_cmd) { case Q_GETINFO: /* read grace times */ + case LUSTRE_Q_GETINFOPOOL: + /* Global grace time is stored in quota settings of ID 0. */ + id->qid_uid = 0; + /* read inode grace time */ - rc = qmt_getinfo(env, qmt, 0, LQUOTA_RES_MD, oqctl->qc_type, - &oqctl->qc_dqinfo.dqi_igrace); - if (rc) + rc = qmt_get(env, qmt, LQUOTA_RES_MD, oqctl->qc_type, id, NULL, + NULL, &oqctl->qc_dqinfo.dqi_igrace, + false, poolname); + /* There could be no MD pool, so try to find DT pool */ + if (rc && rc != -ENOENT) break; /* read block grace time */ - rc = qmt_getinfo(env, qmt, 0, LQUOTA_RES_DT, oqctl->qc_type, - &oqctl->qc_dqinfo.dqi_bgrace); + rc = qmt_get(env, qmt, LQUOTA_RES_DT, oqctl->qc_type, id, NULL, + NULL, &oqctl->qc_dqinfo.dqi_bgrace, + false, poolname); break; case Q_SETINFO: /* modify grace times */ + case LUSTRE_Q_SETINFOPOOL: /* setinfo should be using dqi->dqi_valid, but lfs incorrectly * sets the valid flags in dqb->dqb_valid instead, try to live * with that ... */ + + /* Global grace time is stored in quota settings of ID 0. */ + id->qid_uid = 0; + if ((dqb->dqb_valid & QIF_ITIME) != 0) { /* set inode grace time */ - rc = qmt_setinfo(env, qmt, 0, LQUOTA_RES_MD, - oqctl->qc_type, - oqctl->qc_dqinfo.dqi_igrace); + rc = qmt_set(env, qmt, LQUOTA_RES_MD, oqctl->qc_type, + id, 0, 0, oqctl->qc_dqinfo.dqi_igrace, + QIF_TIMES, false, false, + poolname); if (rc) break; } if ((dqb->dqb_valid & QIF_BTIME) != 0) /* set block grace time */ - rc = qmt_setinfo(env, qmt, 0, LQUOTA_RES_DT, - oqctl->qc_type, - oqctl->qc_dqinfo.dqi_bgrace); + rc = qmt_set(env, qmt, LQUOTA_RES_DT, oqctl->qc_type, + id, 0, 0, oqctl->qc_dqinfo.dqi_bgrace, + QIF_TIMES, false, false, + poolname); break; + case LUSTRE_Q_GETDEFAULT: + case LUSTRE_Q_GETDEFAULT_POOL: + is_default = true; + /* fallthrough */ + case Q_GETQUOTA: /* consult quota limit */ - /* There is no quota limit for root user & group */ - if (oqctl->qc_id == 0) { - memset(dqb, 0, sizeof(*dqb)); - dqb->dqb_valid = QIF_LIMITS | QIF_TIMES; - break; - } + case LUSTRE_Q_GETQUOTAPOOL: /* extract quota ID from quotactl request */ id->qid_uid = oqctl->qc_id; /* look-up inode quota settings */ - rc = qmt_getquota(env, qmt, 0, LQUOTA_RES_MD, oqctl->qc_type, - id, &dqb->dqb_ihardlimit, - &dqb->dqb_isoftlimit, &dqb->dqb_itime); - if (rc) + rc = qmt_get(env, qmt, LQUOTA_RES_MD, oqctl->qc_type, id, + &dqb->dqb_ihardlimit, &dqb->dqb_isoftlimit, + &dqb->dqb_itime, is_default, poolname); + /* There could be no MD pool, so try to find DT pool */ + if (rc && rc != -ENOENT) break; + else + dqb->dqb_valid |= QIF_ILIMITS | QIF_ITIME; - dqb->dqb_valid |= QIF_ILIMITS | QIF_ITIME; /* master isn't aware of actual inode usage */ dqb->dqb_curinodes = 0; /* look-up block quota settings */ - rc = qmt_getquota(env, qmt, 0, LQUOTA_RES_DT, oqctl->qc_type, - id, &dqb->dqb_bhardlimit, - &dqb->dqb_bsoftlimit, &dqb->dqb_btime); + rc = qmt_get(env, qmt, LQUOTA_RES_DT, oqctl->qc_type, id, + &dqb->dqb_bhardlimit, &dqb->dqb_bsoftlimit, + &dqb->dqb_btime, is_default, poolname); if (rc) break; @@ -394,40 +437,36 @@ static int qmt_quotactl(const struct lu_env *env, struct lu_device *ld, dqb->dqb_curspace = 0; break; + case LUSTRE_Q_SETDEFAULT: + case LUSTRE_Q_SETDEFAULT_POOL: + is_default = true; + /* fallthrough */ + case Q_SETQUOTA: /* change quota limits */ - if (oqctl->qc_id == 0) - /* can't enforce a quota limit for root user & group */ - RETURN(-EPERM); + case LUSTRE_Q_SETQUOTAPOOL: /* extract quota ID from quotactl request */ id->qid_uid = oqctl->qc_id; if ((dqb->dqb_valid & QIF_IFLAGS) != 0) { /* update inode quota settings */ - rc = qmt_setquota(env, qmt, 0, LQUOTA_RES_MD, - oqctl->qc_type, id, - dqb->dqb_ihardlimit, - dqb->dqb_isoftlimit, dqb->dqb_itime, - dqb->dqb_valid & QIF_IFLAGS); + rc = qmt_set(env, qmt, LQUOTA_RES_MD, oqctl->qc_type, + id, dqb->dqb_ihardlimit, + dqb->dqb_isoftlimit, dqb->dqb_itime, + dqb->dqb_valid & QIF_IFLAGS, is_default, + false, poolname); if (rc) break; } if ((dqb->dqb_valid & QIF_BFLAGS) != 0) /* update block quota settings */ - rc = qmt_setquota(env, qmt, 0, LQUOTA_RES_DT, - oqctl->qc_type, id, - dqb->dqb_bhardlimit, - dqb->dqb_bsoftlimit, dqb->dqb_btime, - dqb->dqb_valid & QIF_BFLAGS); + rc = qmt_set(env, qmt, LQUOTA_RES_DT, oqctl->qc_type, + id, dqb->dqb_bhardlimit, + dqb->dqb_bsoftlimit, dqb->dqb_btime, + dqb->dqb_valid & QIF_BFLAGS, is_default, + false, poolname); break; - case Q_QUOTAON: - case Q_QUOTAOFF: /* quota is always turned on on the master */ - RETURN(0); - - case LUSTRE_Q_INVALIDATE: /* not supported any more */ - RETURN(-ENOTSUPP); - default: CERROR("%s: unsupported quotactl command: %d\n", qmt->qmt_svname, oqctl->qc_cmd); @@ -437,11 +476,133 @@ static int qmt_quotactl(const struct lu_env *env, struct lu_device *ld, RETURN(rc); } +static inline +void qmt_grant_lqes(const struct lu_env *env, __u64 *slv, __u64 cnt) +{ + int i; + + for (i = 0; i < qti_lqes_cnt(env); i++) + qti_lqe_granted(env, i) += cnt; + + *slv += cnt; +} + +static inline bool qmt_lqes_can_rel(const struct lu_env *env, __u64 cnt) +{ + bool can_release = true; + int i; + + for (i = 0; i < qti_lqes_cnt(env); i++) { + if (cnt > qti_lqe_granted(env, i)) { + LQUOTA_ERROR(qti_lqes(env)[i], + "Can't release %llu that is larger than lqe_granted.\n", + cnt); + can_release = false; + } + } + return can_release; +} + +static inline void qmt_rel_lqes(const struct lu_env *env, __u64 *slv, __u64 cnt) +{ + int i; + + for (i = 0; i < qti_lqes_cnt(env); i++) + qti_lqe_granted(env, i) -= cnt; + + *slv -= cnt; +} + +static inline bool qmt_lqes_cannot_grant(const struct lu_env *env, __u64 cnt) +{ + bool cannot_grant = false; + int i; + + for (i = 0; i < qti_lqes_cnt(env); i++) { + if (qti_lqe_hard(env, i) != 0 && + qti_lqe_granted(env, i) + cnt > qti_lqe_hard(env, i)) { + cannot_grant = true; + break; + } + } + return cannot_grant; +} + +static inline __u64 qmt_lqes_grant_some_quota(const struct lu_env *env) +{ + __u64 min_count, tmp; + bool flag = false; + int i; + + for (i = 0, min_count = 0; i < qti_lqes_cnt(env); i++) { + if (!qti_lqes(env)[i]->lqe_enforced && + !qti_lqes(env)[i]->lqe_is_global) + continue; + + tmp = qti_lqe_hard(env, i) - qti_lqe_granted(env, i); + if (flag) { + min_count = tmp < min_count ? tmp : min_count; + } else { + flag = true; + min_count = tmp; + } + } + return min_count; +} + +static inline __u64 qmt_lqes_alloc_expand(const struct lu_env *env, + __u64 slv_granted, __u64 spare) +{ + __u64 min_count, tmp; + bool flag = false; + int i; + + for (i = 0, min_count = 0; i < qti_lqes_cnt(env); i++) { + /* Don't take into account not enforced lqes that belong + * to non global pool. These lqes present in array to + * support actual lqe_granted even for lqes without limits. */ + if (!qti_lqes(env)[i]->lqe_enforced && + !qti_lqes(env)[i]->lqe_is_global) + continue; + + tmp = qmt_alloc_expand(qti_lqes(env)[i], slv_granted, spare); + if (flag) { + min_count = tmp < min_count ? tmp : min_count; + } else { + flag = true; + min_count = tmp; + } + } + return min_count; +} + +static inline void qmt_lqes_tune_grace(const struct lu_env *env, __u64 now) +{ + int i; + + for (i = 0; i < qti_lqes_cnt(env); i++) { + struct lquota_entry *lqe; + + lqe = qti_lqes(env)[i]; + if (lqe->lqe_softlimit != 0) { + if (lqe->lqe_granted > lqe->lqe_softlimit && + lqe->lqe_gracetime == 0) { + /* First time over soft limit, let's start grace + * timer */ + lqe->lqe_gracetime = now + qmt_lqe_grace(lqe); + } else if (lqe->lqe_granted <= lqe->lqe_softlimit && + lqe->lqe_gracetime != 0) { + /* Clear grace timer */ + lqe->lqe_gracetime = 0; + } + } + } +} + /* * Helper function to handle quota request from slave. * * \param env - is the environment passed by the caller - * \param lqe - is the lquota_entry subject to the quota request * \param qmt - is the master device * \param uuid - is the uuid associated with the slave * \param qb_flags - are the quota request flags as packed in the quota_body @@ -455,16 +616,16 @@ static int qmt_quotactl(const struct lu_env *env, struct lu_device *ld, * -EINPROGRESS : inform client to retry write/create * -ve : other appropriate errors */ -int qmt_dqacq0(const struct lu_env *env, struct lquota_entry *lqe, - struct qmt_device *qmt, struct obd_uuid *uuid, __u32 qb_flags, - __u64 qb_count, __u64 qb_usage, struct quota_body *repbody) +int qmt_dqacq0(const struct lu_env *env, struct qmt_device *qmt, + struct obd_uuid *uuid, __u32 qb_flags, __u64 qb_count, + __u64 qb_usage, struct quota_body *repbody) { - struct qmt_thread_info *qti = qmt_info(env); __u64 now, count; struct dt_object *slv_obj = NULL; __u64 slv_granted, slv_granted_bck; struct thandle *th = NULL; int rc, ret; + struct lquota_entry *lqe = qti_lqes_glbl(env); ENTRY; LASSERT(uuid != NULL); @@ -476,6 +637,9 @@ int qmt_dqacq0(const struct lu_env *env, struct lquota_entry *lqe, if (OBD_FAIL_CHECK(OBD_FAIL_QUOTA_RECOVERABLE_ERR)) RETURN(-cfs_fail_val); + if (qti_lqes_restore_init(env)) + RETURN(-ENOMEM); + /* look-up index file associated with acquiring slave */ slv_obj = lquota_disk_slv_find(env, qmt->qmt_child, LQE_ROOT(lqe), lu_object_fid(&LQE_GLB_OBJ(lqe)->do_lu), @@ -489,30 +653,31 @@ int qmt_dqacq0(const struct lu_env *env, struct lquota_entry *lqe, /* allocate & start transaction with enough credits to update * global & slave indexes */ - th = qmt_trans_start_with_slv(env, lqe, slv_obj, &qti->qti_restore); + th = qmt_trans_start_with_slv(env, NULL, slv_obj, false); if (IS_ERR(th)) GOTO(out, rc = PTR_ERR(th)); - lqe_write_lock(lqe); - LQUOTA_DEBUG(lqe, "dqacq starts uuid:%s flags:0x%x wanted:"LPU64 - " usage:"LPU64, obd_uuid2str(uuid), qb_flags, qb_count, + qti_lqes_write_lock(env); + + LQUOTA_DEBUG_LQES(env, "dqacq starts uuid:%s flags:0x%x wanted:%llu" + " usage:%llu", obd_uuid2str(uuid), qb_flags, qb_count, qb_usage); /* Legal race, limits have been removed on master, but slave didn't * receive the change yet. Just return EINPROGRESS until the slave gets * notified. */ if (!lqe->lqe_enforced && !req_is_rel(qb_flags)) - GOTO(out_locked, rc = -EINPROGRESS); + GOTO(out_locked, rc = -ESRCH); /* recompute qunit in case it was never initialized */ - qmt_revalidate(env, lqe); + qmt_revalidate_lqes(env, qmt, qb_flags); /* slave just wants to acquire per-ID lock */ if (req_is_acq(qb_flags) && qb_count == 0) GOTO(out_locked, rc = 0); /* fetch how much quota space is already granted to this slave */ - rc = qmt_slv_read(env, lqe, slv_obj, &slv_granted); + rc = qmt_slv_read(env, &lqe->lqe_id, slv_obj, &slv_granted); if (rc) { LQUOTA_ERROR(lqe, "Failed to get granted for slave %s, rc=%d", obd_uuid2str(uuid), rc); @@ -523,23 +688,23 @@ int qmt_dqacq0(const struct lu_env *env, struct lquota_entry *lqe, slv_granted_bck = slv_granted; /* record current time for soft limit & grace time management */ - now = (__u64)cfs_time_current_sec(); + now = ktime_get_real_seconds(); if (req_is_rel(qb_flags)) { /* Slave would like to release quota space */ if (slv_granted < qb_count || - lqe->lqe_granted < qb_count) { + !qmt_lqes_can_rel(env, qb_count)) { /* can't release more than granted */ - LQUOTA_ERROR(lqe, "Release too much! uuid:%s release:" - LPU64" granted:"LPU64", total:"LPU64, - obd_uuid2str(uuid), qb_count, - slv_granted, lqe->lqe_granted); + LQUOTA_ERROR_LQES(env, + "Release too much! uuid:%s release: %llu granted:%llu, total:%llu", + obd_uuid2str(uuid), qb_count, + slv_granted, lqe->lqe_granted); GOTO(out_locked, rc = -EINVAL); } repbody->qb_count = qb_count; /* put released space back to global pool */ - QMT_REL(lqe, slv_granted, qb_count); + qmt_rel_lqes(env, &slv_granted, qb_count); GOTO(out_write, rc = 0); } @@ -548,18 +713,18 @@ int qmt_dqacq0(const struct lu_env *env, struct lquota_entry *lqe, * out to be using more quota space than owned, so we adjust * granted space regardless of the current state of affairs */ repbody->qb_count = qb_usage - slv_granted; - QMT_GRANT(lqe, slv_granted, repbody->qb_count); + qmt_grant_lqes(env, &slv_granted, repbody->qb_count); } if (!req_is_acq(qb_flags) && !req_is_preacq(qb_flags)) GOTO(out_write, rc = 0); - qmt_adjust_edquot(lqe, now); - if (lqe->lqe_edquot) + qmt_adjust_edquot_notify(env, qmt, now, qb_flags); + if (qti_lqes_edquot(env)) /* no hope to claim further space back */ GOTO(out_write, rc = -EDQUOT); - if (qmt_space_exhausted(lqe, now)) { + if (qmt_space_exhausted_lqes(env, now)) { /* might have some free space once rebalancing is completed */ rc = req_is_acq(qb_flags) ? -EINPROGRESS : -EDQUOT; GOTO(out_write, rc); @@ -570,41 +735,39 @@ int qmt_dqacq0(const struct lu_env *env, struct lquota_entry *lqe, * reports in qb_count how much spare quota space it owns and we * can grant back quota space which is consistent with qunit * value. */ - - if (qb_count >= lqe->lqe_qunit) + if (qb_count >= qti_lqes_min_qunit(env)) /* slave already own the maximum it should */ GOTO(out_write, rc = 0); - count = qmt_alloc_expand(lqe, slv_granted, qb_count); + count = qmt_lqes_alloc_expand(env, slv_granted, qb_count); if (count == 0) GOTO(out_write, rc = -EDQUOT); repbody->qb_count += count; - QMT_GRANT(lqe, slv_granted, count); + qmt_grant_lqes(env, &slv_granted, count); GOTO(out_write, rc = 0); } /* processing acquire request with clients waiting */ - if (lqe->lqe_hardlimit != 0 && - lqe->lqe_granted + qb_count > lqe->lqe_hardlimit) { + if (qmt_lqes_cannot_grant(env, qb_count)) { /* cannot grant as much as asked, but can still afford to grant * some quota space back */ - count = lqe->lqe_hardlimit - lqe->lqe_granted; + count = qmt_lqes_grant_some_quota(env); repbody->qb_count += count; - QMT_GRANT(lqe, slv_granted, count); + qmt_grant_lqes(env, &slv_granted, count); GOTO(out_write, rc = 0); } /* Whouhou! we can satisfy the slave request! */ repbody->qb_count += qb_count; - QMT_GRANT(lqe, slv_granted, qb_count); + qmt_grant_lqes(env, &slv_granted, qb_count); /* Try to expand the acquired count for DQACQ */ - count = qmt_alloc_expand(lqe, slv_granted, 0); + count = qmt_lqes_alloc_expand(env, slv_granted, 0); if (count != 0) { /* can even grant more than asked, it is like xmas ... */ repbody->qb_count += count; - QMT_GRANT(lqe, slv_granted, count); + qmt_grant_lqes(env, &slv_granted, count); GOTO(out_write, rc = 0); } @@ -614,35 +777,25 @@ out_write: GOTO(out_locked, rc); /* start/stop grace timer if required */ - if (lqe->lqe_softlimit != 0) { - if (lqe->lqe_granted > lqe->lqe_softlimit && - lqe->lqe_gracetime == 0) - /* first time over soft limit, let's start grace - * timer */ - lqe->lqe_gracetime = now + qmt_lqe_grace(lqe); - else if (lqe->lqe_granted <= lqe->lqe_softlimit && - lqe->lqe_gracetime != 0) - /* Clear grace timer */ - lqe->lqe_gracetime = 0; - } + qmt_lqes_tune_grace(env, now); /* Update slave index first since it is easier to roll back */ ret = qmt_slv_write(env, th, lqe, slv_obj, LQUOTA_BUMP_VER, &repbody->qb_slv_ver, slv_granted); if (ret) { /* restore initial quota settings */ - qmt_restore(lqe, &qti->qti_restore); + qmt_restore_lqes(env); /* reset qb_count */ repbody->qb_count = 0; GOTO(out_locked, rc = ret); } /* Update global index, no version bump needed */ - ret = qmt_glb_write(env, th, lqe, 0, NULL); + ret = qmt_glb_write_lqes(env, th, 0, NULL); if (ret) { rc = ret; /* restore initial quota settings */ - qmt_restore(lqe, &qti->qti_restore); + qmt_restore_lqes(env); /* reset qb_count */ repbody->qb_count = 0; @@ -654,26 +807,27 @@ out_write: "value rc:%d ret%d", rc, ret); LBUG(); } - qmt_adjust_edquot(lqe, now); + qmt_adjust_edquot_notify(env, qmt, now, qb_flags); GOTO(out_locked, rc); } /* Total granted has been changed, let's try to adjust the qunit * size according to the total granted & limits. */ - qmt_adjust_qunit(env, lqe); /* clear/set edquot flag and notify slaves via glimpse if needed */ - qmt_adjust_edquot(lqe, now); + qmt_adjust_and_notify(env, qmt, now, qb_flags); out_locked: - LQUOTA_DEBUG(lqe, "dqacq ends count:"LPU64" ver:"LPU64" rc:%d", + LQUOTA_DEBUG_LQES(env, "dqacq ends count:%llu ver:%llu rc:%d", repbody->qb_count, repbody->qb_slv_ver, rc); - lqe_write_unlock(lqe); + qti_lqes_write_unlock(env); out: + qti_lqes_restore_fini(env); + if (th != NULL && !IS_ERR(th)) dt_trans_stop(env, qmt->qmt_child, th); if (slv_obj != NULL && !IS_ERR(slv_obj)) - lu_object_put(env, &slv_obj->do_lu); + dt_object_put(env, slv_obj); if ((req_is_acq(qb_flags) || req_is_preacq(qb_flags)) && OBD_FAIL_CHECK(OBD_FAIL_QUOTA_EDQUOT)) { @@ -687,6 +841,55 @@ out: } /* + * Extract index from uuid or quota index file name. + * + * \param[in] uuid uuid or quota index name(0x1020000-OST0001_UUID) + * \param[out] idx pointer to save index + * + * \retval slave type(QMT_STYPE_MDT or QMT_STYPE_OST) + * \retval -EINVAL wrong uuid + */ +int qmt_uuid2idx(struct obd_uuid *uuid, int *idx) +{ + char *uuid_str, *name, *dash; + int rc = -EINVAL; + + uuid_str = (char *)uuid->uuid; + + if (strnlen(uuid_str, UUID_MAX) >= UUID_MAX) { + CERROR("quota: UUID '%.*s' missing trailing NUL: rc = %d\n", + UUID_MAX, uuid_str, rc); + return rc; + } + + dash = strrchr(uuid_str, '-'); + name = dash + 1; + /* Going to get index from MDTXXXX/OSTXXXX. Thus uuid should + * have at least 8 bytes after '-': 3 for MDT/OST, 4 for index + * and 1 byte for null character. */ + if (*dash != '-' || ((uuid_str + UUID_MAX - name) < 8)) { + CERROR("quota: wrong UUID format '%s': rc = %d\n", + uuid_str, rc); + return rc; + } + + rc = target_name2index(name, idx, NULL); + switch (rc) { + case LDD_F_SV_TYPE_MDT: + rc = QMT_STYPE_MDT; + break; + case LDD_F_SV_TYPE_OST: + rc = QMT_STYPE_OST; + break; + default: + CERROR("quota: wrong UUID type '%s': rc = %d\n", uuid_str, rc); + rc = -EINVAL; + } + + RETURN(rc); +} + +/* * Handle quota request from slave. * * \param env - is the environment passed by the caller @@ -696,13 +899,12 @@ out: static int qmt_dqacq(const struct lu_env *env, struct lu_device *ld, struct ptlrpc_request *req) { - struct qmt_device *qmt = lu2qmt_dev(ld); - struct quota_body *qbody, *repbody; - struct obd_uuid *uuid; - struct ldlm_lock *lock; - struct lquota_entry *lqe; - int pool_id, pool_type, qtype; - int rc; + struct qmt_device *qmt = lu2qmt_dev(ld); + struct quota_body *qbody, *repbody; + struct obd_uuid *uuid; + struct ldlm_lock *lock; + int rtype, qtype; + int rc, idx, stype; ENTRY; qbody = req_capsule_client_get(&req->rq_pill, &RMF_QUOTA_BODY); @@ -723,6 +925,9 @@ static int qmt_dqacq(const struct lu_env *env, struct lu_device *ld, LDLM_LOCK_PUT(lock); uuid = &req->rq_export->exp_client_uuid; + stype = qmt_uuid2idx(uuid, &idx); + if (stype < 0) + RETURN(stype); if (req_is_rel(qbody->qb_flags) + req_is_acq(qbody->qb_flags) + req_is_preacq(qbody->qb_flags) > 1) { @@ -752,13 +957,13 @@ static int qmt_dqacq(const struct lu_env *env, struct lu_device *ld, RETURN(-ENOLCK); } - if ((lock->l_flags & LDLM_FL_AST_SENT) != 0) { - struct ptlrpc_service_part *svc; - unsigned int timeout; + if (ldlm_is_ast_sent(lock)) { + struct ptlrpc_service_part *svc; + timeout_t timeout; svc = req->rq_rqbd->rqbd_svcpt; timeout = at_est2timeout(at_get(&svc->scp_at_estimate)); - timeout = max(timeout, ldlm_timeout); + timeout += (ldlm_bl_timeout(lock) >> 1); /* lock is being cancelled, prolong timeout */ ldlm_refresh_waiting_lock(lock, timeout); @@ -766,28 +971,29 @@ static int qmt_dqacq(const struct lu_env *env, struct lu_device *ld, LDLM_LOCK_PUT(lock); } - /* extract pool & quota information from global index FID packed in the + /* extract quota information from global index FID packed in the * request */ - rc = lquota_extract_fid(&qbody->qb_fid, &pool_id, &pool_type, &qtype); + rc = lquota_extract_fid(&qbody->qb_fid, &rtype, &qtype); if (rc) RETURN(-EINVAL); /* Find the quota entry associated with the quota id */ - lqe = qmt_pool_lqe_lookup(env, qmt, pool_id, pool_type, qtype, - &qbody->qb_id); - if (IS_ERR(lqe)) - RETURN(PTR_ERR(lqe)); + rc = qmt_pool_lqes_lookup(env, qmt, rtype, stype, qtype, + &qbody->qb_id, NULL, idx); + if (rc) + RETURN(rc); - /* process quota request */ - rc = qmt_dqacq0(env, lqe, qmt, uuid, qbody->qb_flags, qbody->qb_count, - qbody->qb_usage, repbody); + rc = qmt_dqacq0(env, qmt, uuid, qbody->qb_flags, + qbody->qb_count, qbody->qb_usage, repbody); if (lustre_handle_is_used(&qbody->qb_lockh)) /* return current qunit value only to slaves owning an per-ID * quota lock. For enqueue, the qunit value will be returned in * the LVB */ - repbody->qb_qunit = lqe->lqe_qunit; - lqe_putref(lqe); + repbody->qb_qunit = qti_lqes_min_qunit(env); + CDEBUG(D_QUOTA, "qmt_dqacq return qb_qunit %llu qb_count %llu\n", + repbody->qb_qunit, repbody->qb_count); + qti_lqes_fini(env); RETURN(rc); }