X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;ds=sidebyside;f=lustre%2Fquota%2Fqmt_handler.c;h=8eb2824c195d8de85813ad524f51236838991a67;hb=6c0b4329d046de283eeb254fca561be9386df68a;hp=c355da2d3d9695a54d55d9992b0ab024649ddbc2;hpb=5549b1b9e032c6eae63c9bde7104769697b93a0f;p=fs%2Flustre-release.git diff --git a/lustre/quota/qmt_handler.c b/lustre/quota/qmt_handler.c index c355da2..8eb2824c 100644 --- a/lustre/quota/qmt_handler.c +++ b/lustre/quota/qmt_handler.c @@ -113,7 +113,7 @@ static void qmt_set_id_notify(const struct lu_env *env, struct qmt_device *qmt, lqe_gl = lqe->lqe_is_global ? lqe : NULL; rc = qmt_pool_lqes_lookup_spec(env, qmt, lqe_rtype(lqe), lqe_qtype(lqe), &lqe->lqe_id); - if (!qti_lqes_cnt(env)) + if (rc) GOTO(lqes_fini, rc); if (!lqe_gl && qti_lqes_glbl(env)->lqe_is_global) @@ -122,8 +122,11 @@ static void qmt_set_id_notify(const struct lu_env *env, struct qmt_device *qmt, if (!lqe_gl) GOTO(lqes_fini, rc); + mutex_lock(&lqe_gl->lqe_glbl_data_lock); if (lqe_gl->lqe_glbl_data) qmt_seed_glbe(env, lqe_gl->lqe_glbl_data); + mutex_unlock(&lqe_gl->lqe_glbl_data_lock); + /* Even if slaves haven't enqueued quota lock yet, * it is needed to set lqe_revoke_time in qmt_id_lock_glimpse * in case of reaching qpi_least_qunit */ @@ -202,6 +205,10 @@ int qmt_set_with_lqe(const struct lu_env *env, struct qmt_device *qmt, /* change quota limits */ lqe->lqe_hardlimit = hard; lqe->lqe_softlimit = soft; + if (is_default) { + dirtied = true; + GOTO(quota_write, 0); + } quota_set: /* recompute qunit in case it was never initialized */ @@ -245,6 +252,7 @@ quota_set: dirtied = true; } +quota_write: if (dirtied) { if (!is_updated) { /* write new quota settings to disk */ @@ -335,13 +343,206 @@ static int qmt_set(const struct lu_env *env, struct qmt_device *qmt, if (IS_ERR(lqe)) RETURN(PTR_ERR(lqe)); + lqe->lqe_is_deleted = 0; + lqe->lqe_is_reset = 0; rc = qmt_set_with_lqe(env, qmt, lqe, hard, soft, time, valid, is_default, is_updated); + if (rc == 0) + lqe->lqe_is_deleted = 0; + lqe_putref(lqe); RETURN(rc); } /* + * Delete the quota setting of the specified quota ID + * + * \param env - is the environment passed by the caller + * \param qmt - is the quota master target + * \param restype - is the pool type, either block (i.e. LQUOTA_RES_DT) or + * inode (i.e. LQUOTA_RES_MD) + * \param qtype - is the quota type + * \param qid - is the quota indentifier for which we want to delete its + * quota settings. + */ +static int qmt_delete_qid(const struct lu_env *env, struct qmt_device *qmt, + __u8 restype, __u8 qtype, __u64 qid) +{ + struct qmt_thread_info *qti = qmt_info(env); + union lquota_id *quota_id = &qti->qti_id; + struct thandle *th = NULL; + struct qmt_pool_info *qpi = NULL; + struct lquota_entry *lqe = NULL; + __u64 ver = 0; + int rc; + + ENTRY; + + quota_id->qid_uid = qid; + lqe = qmt_pool_lqe_lookup(env, qmt, restype, qtype, quota_id, NULL); + if (IS_ERR(lqe)) + RETURN(PTR_ERR(lqe)); + + lqe_write_lock(lqe); + + qpi = qmt_pool_lookup_glb(env, qmt, restype); + if (IS_ERR(qpi)) + GOTO(out, rc = -ENOMEM); + + th = qmt_trans_start(env, lqe); + if (IS_ERR(th)) + GOTO(out, rc = PTR_ERR(th)); + + rc = lquota_disk_delete(env, th, + qpi->qpi_glb_obj[qtype], qid, &ver); + + dt_trans_stop(env, qmt->qmt_child, th); + + if (rc == 0) { + lqe_set_deleted(lqe); + qmt_glb_lock_notify(env, lqe, ver); + } else if (rc == -ENOENT) { + rc = 0; + } + +out: + if (!IS_ERR_OR_NULL(qpi)) + qpi_putref(env, qpi); + + lqe_write_unlock(lqe); + lqe_putref(lqe); + + RETURN(rc); +} + +static int qmt_reset_slv_cb(const struct lu_env *env, struct lu_fid *glb_fid, + char *slv_name, struct lu_fid *slv_fid, void *arg) +{ + struct qmt_device *qmt = (struct qmt_device *)arg; + struct qmt_thread_info *qti = qmt_info(env); + struct dt_object *slv_obj = NULL; + struct lquota_slv_rec rec; + struct thandle *th = NULL; + int rc; + + slv_obj = dt_locate(env, qmt->qmt_child, slv_fid); + if (IS_ERR(slv_obj)) + GOTO(out, rc = PTR_ERR(slv_obj)); + + if (slv_obj->do_index_ops == NULL) { + rc = slv_obj->do_ops->do_index_try(env, slv_obj, + &dt_quota_slv_features); + if (rc) { + CERROR("%s: fail to setup slave idx for %s: rc = %d\n", + qmt->qmt_child->dd_lu_dev.ld_obd->obd_name, + slv_name, rc); + GOTO(out, rc); + } + } + + th = qmt_trans_start(env, qti_lqes(env)[0]); + if (IS_ERR(th)) + GOTO(out, rc = PTR_ERR(th)); + + rec.qsr_granted = 0; + rc = lquota_disk_write(env, th, slv_obj, &qti->qti_id, + (struct dt_rec *)&rec, 0, NULL); + if (rc) + CERROR("%s: failed to reset slave grant for %s: rc = %d\n", + qmt->qmt_child->dd_lu_dev.ld_obd->obd_name, slv_name, + rc); +out: + if (!IS_ERR_OR_NULL(th)) + dt_trans_stop(env, qmt->qmt_child, th); + + if (slv_obj != NULL) + dt_object_put(env, slv_obj); + return 0; +} + +/* + * Reset the quota of the quota ID, it will reset the soft/hard limit and grant + * + * \param env - is the environment passed by the caller + * \param qmt - is the quota master target + * \param restype - is the pool type, either block (i.e. LQUOTA_RES_DT) or + * inode (i.e. LQUOTA_RES_MD) + * \param qtype - is the quota type + * \param qid - is the quota indentifier for which we want to delete its + * quota settings. + */ +static int qmt_reset_qid(const struct lu_env *env, struct qmt_device *qmt, + __u8 restype, __u8 qtype, __u64 qid) +{ + struct qmt_thread_info *qti = qmt_info(env); + union lquota_id *quota_id = &qti->qti_id; + struct qmt_pool_info *qpi = NULL; + struct lquota_entry *lqe = NULL; + struct thandle *th = NULL; + __u64 softlimit = 0, hardlimit = 0; + __u64 ver = 0; + int rc; + + ENTRY; + + quota_id->qid_uid = qid; + lqe = qmt_pool_lqe_lookup(env, qmt, restype, qtype, quota_id, NULL); + if (IS_ERR(lqe)) + RETURN(PTR_ERR(lqe)); + + lqe_write_lock(lqe); + + qpi = qmt_pool_lookup_glb(env, qmt, restype); + if (IS_ERR(qpi)) + GOTO(out, rc = -ENOMEM); + + th = qmt_trans_start(env, lqe); + if (IS_ERR(th)) + GOTO(out, rc = PTR_ERR(th)); + + softlimit = lqe->lqe_softlimit; + hardlimit = lqe->lqe_hardlimit; + + lqe->lqe_softlimit = 0; + lqe->lqe_hardlimit = 0; + lqe->lqe_granted = 0; + lqe->lqe_edquot = 0; + lqe->lqe_qunit = 0; + lqe->lqe_is_default = 0; + lqe->lqe_is_deleted = 0; + lqe->lqe_is_reset = 1; + rc = qmt_glb_write(env, th, lqe, LQUOTA_BUMP_VER, &ver); + if (rc) + LQUOTA_ERROR(lqe, "failed to write quota global rec\n"); + dt_trans_stop(env, qmt->qmt_child, th); + if (rc) + GOTO(out, rc); + + lquota_generate_fid(&qti->qti_fid, restype, qtype); + qti_lqes(env)[0] = lqe; + lquota_disk_for_each_slv(env, qpi->qpi_root, &qti->qti_fid, + qmt_reset_slv_cb, qmt); + + qmt_glb_lock_notify(env, lqe, ver); + +out: + if (rc) { + if (softlimit != 0) + lqe->lqe_softlimit = softlimit; + if (hardlimit != 0) + lqe->lqe_hardlimit = hardlimit; + lqe->lqe_is_reset = 0; + } + + if (!IS_ERR_OR_NULL(qpi)) + qpi_putref(env, qpi); + + lqe_write_unlock(lqe); + lqe_putref(lqe); + + RETURN(rc); +} +/* * Handle quotactl request. * * \param env - is the environment passed by the caller @@ -482,6 +683,51 @@ static int qmt_quotactl(const struct lu_env *env, struct lu_device *ld, false, poolname); break; + case LUSTRE_Q_DELETEQID: + rc = qmt_delete_qid(env, qmt, LQUOTA_RES_MD, oqctl->qc_type, + oqctl->qc_id); + if (rc) + break; + + rc = qmt_delete_qid(env, qmt, LQUOTA_RES_DT, oqctl->qc_type, + oqctl->qc_id); + break; + + case LUSTRE_Q_RESETQID: + if (oqctl->qc_id == 0) + RETURN(-EINVAL); + + id->qid_uid = oqctl->qc_id; + /* save the quota setting before resetting */ + rc = qmt_get(env, qmt, LQUOTA_RES_MD, oqctl->qc_type, id, + &dqb->dqb_ihardlimit, &dqb->dqb_isoftlimit, + &dqb->dqb_itime, false, NULL); + if (rc) + break; + else + dqb->dqb_valid |= QIF_ILIMITS | QIF_ITIME; + + rc = qmt_get(env, qmt, LQUOTA_RES_DT, oqctl->qc_type, id, + &dqb->dqb_bhardlimit, &dqb->dqb_bsoftlimit, + &dqb->dqb_btime, false, NULL); + if (rc) + break; + + dqb->dqb_valid |= QIF_BLIMITS | QIF_BTIME; + dqb->dqb_curinodes = 0; + dqb->dqb_curspace = 0; + + /* reset the corresponding quota ID */ + rc = qmt_reset_qid(env, qmt, LQUOTA_RES_MD, oqctl->qc_type, + oqctl->qc_id); + if (rc) + break; + + rc = qmt_reset_qid(env, qmt, LQUOTA_RES_DT, oqctl->qc_type, + oqctl->qc_id); + break; + + default: CERROR("%s: unsupported quotactl command: %d\n", qmt->qmt_svname, oqctl->qc_cmd); @@ -518,14 +764,22 @@ static inline bool qmt_lqes_can_rel(const struct lu_env *env, __u64 cnt) return can_release; } -static inline void qmt_rel_lqes(const struct lu_env *env, __u64 *slv, __u64 cnt) +static inline void qmt_rel_lqes(const struct lu_env *env, __u64 *slv, __u64 cnt, + bool reset) { int i; - for (i = 0; i < qti_lqes_cnt(env); i++) - qti_lqe_granted(env, i) -= cnt; + for (i = 0; i < qti_lqes_cnt(env); i++) { + if (reset) + qti_lqe_granted(env, i) = 0; + else + qti_lqe_granted(env, i) -= cnt; + } - *slv -= cnt; + if (reset) + *slv = 0; + else + *slv -= cnt; } static inline bool qmt_lqes_cannot_grant(const struct lu_env *env, __u64 cnt) @@ -625,6 +879,7 @@ static inline void qmt_lqes_tune_grace(const struct lu_env *env, __u64 now) * acquire/release * \param qb_usage - is the current space usage on the slave * \param repbody - is the quota_body of reply + * \param idx - is the index of a slave target * * \retval 0 : success * \retval -EDQUOT : out of quota @@ -633,9 +888,9 @@ static inline void qmt_lqes_tune_grace(const struct lu_env *env, __u64 now) */ int qmt_dqacq0(const struct lu_env *env, struct qmt_device *qmt, struct obd_uuid *uuid, __u32 qb_flags, __u64 qb_count, - __u64 qb_usage, struct quota_body *repbody) + __u64 qb_usage, struct quota_body *repbody, int idx) { - __u64 now, count; + __u64 now, count = 0; struct dt_object *slv_obj = NULL; __u64 slv_granted, slv_granted_bck; struct thandle *th = NULL; @@ -695,6 +950,13 @@ int qmt_dqacq0(const struct lu_env *env, struct qmt_device *qmt, if (req_is_acq(qb_flags) && qb_count == 0) GOTO(out_locked, rc = 0); + if (lqe->lqe_is_reset) { + lqe->lqe_granted = 0; + repbody->qb_count = qb_count; + qmt_rel_lqes(env, &slv_granted, qb_count, lqe->lqe_is_reset); + GOTO(out_locked, rc = 0); + } + /* fetch how much quota space is already granted to this slave */ rc = qmt_slv_read(env, &lqe->lqe_id, slv_obj, &slv_granted); if (rc) { @@ -723,7 +985,7 @@ int qmt_dqacq0(const struct lu_env *env, struct qmt_device *qmt, repbody->qb_count = qb_count; /* put released space back to global pool */ - qmt_rel_lqes(env, &slv_granted, qb_count); + qmt_rel_lqes(env, &slv_granted, qb_count, lqe->lqe_is_reset); GOTO(out_write, rc = 0); } @@ -798,6 +1060,9 @@ out_write: /* start/stop grace timer if required */ qmt_lqes_tune_grace(env, now); + if (OBD_FAIL_CHECK(OBD_FAIL_QUOTA_GRANT)) + slv_granted = 0xFFFFFFFFFFF00000; + /* Update slave index first since it is easier to roll back */ ret = qmt_slv_write(env, th, lqe, slv_obj, LQUOTA_BUMP_VER, &repbody->qb_slv_ver, slv_granted); @@ -834,7 +1099,7 @@ out_write: * size according to the total granted & limits. */ /* clear/set edquot flag and notify slaves via glimpse if needed */ - qmt_adjust_and_notify(env, qmt, now, qb_flags); + qmt_adjust_notify_nu(env, qmt, now, qb_flags, idx); out_locked: LQUOTA_DEBUG_LQES(env, "dqacq ends count:%llu ver:%llu rc:%d", repbody->qb_count, repbody->qb_slv_ver, rc); @@ -1003,7 +1268,8 @@ static int qmt_dqacq(const struct lu_env *env, struct lu_device *ld, RETURN(rc); rc = qmt_dqacq0(env, qmt, uuid, qbody->qb_flags, - qbody->qb_count, qbody->qb_usage, repbody); + qbody->qb_count, qbody->qb_usage, repbody, + qmt_dom(rtype, stype) ? -1 : idx); if (lustre_handle_is_used(&qbody->qb_lockh)) /* return current qunit value only to slaves owning an per-ID