X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;ds=sidebyside;f=lustre%2Fquota%2Fqmt_entry.c;h=92d83b69bf40b986132caf79d8f8ce79d7c49ca4;hb=d4978678b49102226a79a6c8e5d10075d416977d;hp=a1eb9c79581b40316bf76edb9c185b8499a3c0ee;hpb=f0736a6a52ed95814d2cac875caf34f7fc233bf3;p=fs%2Flustre-release.git diff --git a/lustre/quota/qmt_entry.c b/lustre/quota/qmt_entry.c index a1eb9c7..92d83b6 100644 --- a/lustre/quota/qmt_entry.c +++ b/lustre/quota/qmt_entry.c @@ -44,6 +44,7 @@ static void qmt_lqe_init(struct lquota_entry *lqe, void *arg) lqe->lqe_revoke_time = 0; init_rwsem(&lqe->lqe_sem); + mutex_init(&lqe->lqe_glbl_data_lock); } /* Apply the default quota setting to the specified quota entry @@ -301,8 +302,7 @@ struct thandle *qmt_trans_start(const struct lu_env *env, int qmt_glb_write_lqes(const struct lu_env *env, struct thandle *th, __u32 flags, __u64 *ver) { - int i, rc; - rc = 0; + int i, rc = 0; for (i = 0; i < qti_lqes_cnt(env); i++) { rc = qmt_glb_write(env, th, qti_lqes(env)[i], flags, ver); @@ -354,7 +354,8 @@ int qmt_glb_write(const struct lu_env *env, struct thandle *th, if (lqe->lqe_is_default) { rec->qbr_hardlimit = 0; rec->qbr_softlimit = 0; - rec->qbr_time = LQUOTA_GRACE_FLAG(0, LQUOTA_FLAG_DEFAULT); + rec->qbr_time = LQUOTA_GRACE_FLAG(lqe->lqe_gracetime, + LQUOTA_FLAG_DEFAULT); } else { rec->qbr_hardlimit = lqe->lqe_hardlimit; rec->qbr_softlimit = lqe->lqe_softlimit; @@ -652,6 +653,33 @@ __u64 qmt_alloc_expand(struct lquota_entry *lqe, __u64 granted, __u64 spare) RETURN(0); } +static inline void +qmt_adjust_qunit_set_revoke(const struct lu_env *env, struct lquota_entry *lqe, + unsigned long least_qunit) +{ + struct lquota_entry *lqe2; + time64_t min = 0; + int i; + + if (qti_lqes_cnt(env) <= 1) + return; + + for (i = 0; i < qti_lqes_cnt(env); i++) { + lqe2 = qti_lqes(env)[i]; + if ((lqe2->lqe_qunit == least_qunit) && lqe2->lqe_revoke_time) { + if (!min) { + min = lqe2->lqe_revoke_time; + continue; + } + min = lqe2->lqe_revoke_time < min ? + lqe2->lqe_revoke_time : min; + } + } + + lqe->lqe_revoke_time = min; +} + + /* * Adjust qunit size according to quota limits and total granted count. * The caller must have locked the lqe. @@ -771,10 +799,17 @@ done: /* reset revoke time */ lqe->lqe_revoke_time = 0; - if (lqe->lqe_qunit >= qunit && - (lqe->lqe_qunit == pool->qpi_least_qunit)) { - /* initial qunit value is the smallest one */ - lqe->lqe_revoke_time = ktime_get_seconds(); + if (lqe->lqe_qunit == pool->qpi_least_qunit) { + if (lqe->lqe_qunit >= qunit) + /* initial qunit value is the smallest one */ + lqe->lqe_revoke_time = ktime_get_seconds(); + /* If there are several lqes and lqe_revoke_time is set for + * some of them, it means appropriate OSTs have been already + * notified with the least qunit and there is no chance to + * free more space. Find an lqe with the minimum(earliest) + * revoke_time and set this time to the current one. + */ + qmt_adjust_qunit_set_revoke(env, lqe, pool->qpi_least_qunit); } RETURN(need_reseed); } @@ -808,8 +843,13 @@ bool qmt_adjust_edquot_qunit_notify(const struct lu_env *env, return need_reseed; } - if (lqe_gl->lqe_glbl_data && need_reseed) { - qmt_seed_glbe_all(env, lqe_gl->lqe_glbl_data, qunit, edquot); + if (need_reseed) { + mutex_lock(&lqe_gl->lqe_glbl_data_lock); + if (lqe_gl->lqe_glbl_data) + qmt_seed_glbe_all(env, lqe_gl->lqe_glbl_data, qunit, + edquot); + mutex_unlock(&lqe_gl->lqe_glbl_data_lock); + qmt_id_lock_notify(qmt, lqe_gl); } return need_reseed; @@ -846,17 +886,23 @@ void qmt_revalidate_lqes(const struct lu_env *env, for (i = 0; i < qti_lqes_cnt(env); i++) need_notify |= qmt_revalidate(env, qti_lqes(env)[i]); + if (!need_notify) + return; + /* There could be no ID lock to the moment of reconciliation. * As a result lqe global data is not initialised yet. It is ok * for release and report requests. */ if (!lqe_gl->lqe_glbl_data && - (req_is_rel(qb_flags) || req_has_rep(qb_flags))) + (req_is_rel(qb_flags) || req_has_rep(qb_flags))) { return; + } - if (need_notify) { + mutex_lock(&lqe_gl->lqe_glbl_data_lock); + if (lqe_gl->lqe_glbl_data) qmt_seed_glbe(env, lqe_gl->lqe_glbl_data); - qmt_id_lock_notify(qmt, lqe_gl); - } + mutex_unlock(&lqe_gl->lqe_glbl_data_lock); + + qmt_id_lock_notify(qmt, lqe_gl); } void qti_lqes_init(const struct lu_env *env) @@ -872,7 +918,7 @@ int qti_lqes_add(const struct lu_env *env, struct lquota_entry *lqe) { struct qmt_thread_info *qti = qmt_info(env); - if (qti->qti_lqes_cnt > qti->qti_lqes_num) { + if (qti->qti_lqes_cnt >= qti->qti_lqes_num) { struct lquota_entry **lqes; lqes = qti->qti_lqes; OBD_ALLOC(lqes, sizeof(lqe) * qti->qti_lqes_num * 2); @@ -942,15 +988,20 @@ void qti_lqes_fini(const struct lu_env *env) if (qti->qti_lqes_num > QMT_MAX_POOL_NUM) OBD_FREE(qti->qti_lqes, qti->qti_lqes_num * sizeof(struct lquota_entry *)); + + qti->qti_lqes_num = 0; + qti->qti_lqes_cnt = 0; } -int qti_lqes_min_qunit(const struct lu_env *env) +__u64 qti_lqes_min_qunit(const struct lu_env *env) { - int i, min, qunit; + __u64 min, qunit; + int i; for (i = 1, min = qti_lqe_qunit(env, 0); i < qti_lqes_cnt(env); i++) { qunit = qti_lqe_qunit(env, i); - if (qunit < min) + /* if qunit is 0, lqe is not enforced and we can ignore it */ + if (qunit && qunit < min) min = qunit; } @@ -973,7 +1024,7 @@ int qti_lqes_restore_init(const struct lu_env *env) { int rc = 0; - if (qti_lqes_cnt(env) > QMT_MAX_POOL_NUM) { + if (qti_lqes_inited(env) && qti_lqes_cnt(env) > QMT_MAX_POOL_NUM) { OBD_ALLOC(qmt_info(env)->qti_lqes_rstr, qti_lqes_cnt(env) * sizeof(struct qmt_lqe_restore)); if (!qti_lqes_rstr(env)) @@ -985,7 +1036,7 @@ int qti_lqes_restore_init(const struct lu_env *env) void qti_lqes_restore_fini(const struct lu_env *env) { - if (qti_lqes_cnt(env) > QMT_MAX_POOL_NUM) + if (qti_lqes_inited(env) && qti_lqes_cnt(env) > QMT_MAX_POOL_NUM) OBD_FREE(qmt_info(env)->qti_lqes_rstr, qti_lqes_cnt(env) * sizeof(struct qmt_lqe_restore)); } @@ -1050,6 +1101,8 @@ void qmt_seed_glbe_all(const struct lu_env *env, struct lqe_glbl_data *lgd, int i, j, idx; ENTRY; + if (!qti_lqes_cnt(env)) + RETURN_EXIT; /* lqes array is sorted by qunit - the first entry has minimum qunit. * Thus start seeding global qunit's array beginning from the 1st lqe * and appropriate pool. If pools overlapped, slaves from this