lqe->lqe_revoke_time = 0;
init_rwsem(&lqe->lqe_sem);
+ mutex_init(&lqe->lqe_glbl_data_lock);
}
/* Apply the default quota setting to the specified quota entry
int qmt_glb_write_lqes(const struct lu_env *env, struct thandle *th,
__u32 flags, __u64 *ver)
{
- int i, rc;
- rc = 0;
+ int i, rc = 0;
for (i = 0; i < qti_lqes_cnt(env); i++) {
rc = qmt_glb_write(env, th, qti_lqes(env)[i], flags, ver);
if (lqe->lqe_is_default) {
rec->qbr_hardlimit = 0;
rec->qbr_softlimit = 0;
- rec->qbr_time = LQUOTA_GRACE_FLAG(0, LQUOTA_FLAG_DEFAULT);
+ rec->qbr_time = LQUOTA_GRACE_FLAG(lqe->lqe_gracetime,
+ LQUOTA_FLAG_DEFAULT);
} else {
rec->qbr_hardlimit = lqe->lqe_hardlimit;
rec->qbr_softlimit = lqe->lqe_softlimit;
RETURN(0);
}
+static inline void
+qmt_adjust_qunit_set_revoke(const struct lu_env *env, struct lquota_entry *lqe,
+ unsigned long least_qunit)
+{
+ struct lquota_entry *lqe2;
+ time64_t min = 0;
+ int i;
+
+ if (qti_lqes_cnt(env) <= 1)
+ return;
+
+ for (i = 0; i < qti_lqes_cnt(env); i++) {
+ lqe2 = qti_lqes(env)[i];
+ if ((lqe2->lqe_qunit == least_qunit) && lqe2->lqe_revoke_time) {
+ if (!min) {
+ min = lqe2->lqe_revoke_time;
+ continue;
+ }
+ min = lqe2->lqe_revoke_time < min ?
+ lqe2->lqe_revoke_time : min;
+ }
+ }
+
+ lqe->lqe_revoke_time = min;
+}
+
+
/*
* Adjust qunit size according to quota limits and total granted count.
* The caller must have locked the lqe.
/* reset revoke time */
lqe->lqe_revoke_time = 0;
- if (lqe->lqe_qunit >= qunit &&
- (lqe->lqe_qunit == pool->qpi_least_qunit)) {
- /* initial qunit value is the smallest one */
- lqe->lqe_revoke_time = ktime_get_seconds();
+ if (lqe->lqe_qunit == pool->qpi_least_qunit) {
+ if (lqe->lqe_qunit >= qunit)
+ /* initial qunit value is the smallest one */
+ lqe->lqe_revoke_time = ktime_get_seconds();
+ /* If there are several lqes and lqe_revoke_time is set for
+ * some of them, it means appropriate OSTs have been already
+ * notified with the least qunit and there is no chance to
+ * free more space. Find an lqe with the minimum(earliest)
+ * revoke_time and set this time to the current one.
+ */
+ qmt_adjust_qunit_set_revoke(env, lqe, pool->qpi_least_qunit);
}
RETURN(need_reseed);
}
return need_reseed;
}
- if (lqe_gl->lqe_glbl_data && need_reseed) {
- qmt_seed_glbe_all(env, lqe_gl->lqe_glbl_data, qunit, edquot);
+ if (need_reseed) {
+ mutex_lock(&lqe_gl->lqe_glbl_data_lock);
+ if (lqe_gl->lqe_glbl_data)
+ qmt_seed_glbe_all(env, lqe_gl->lqe_glbl_data, qunit,
+ edquot);
+ mutex_unlock(&lqe_gl->lqe_glbl_data_lock);
+
qmt_id_lock_notify(qmt, lqe_gl);
}
return need_reseed;
for (i = 0; i < qti_lqes_cnt(env); i++)
need_notify |= qmt_revalidate(env, qti_lqes(env)[i]);
+ if (!need_notify)
+ return;
+
/* There could be no ID lock to the moment of reconciliation.
* As a result lqe global data is not initialised yet. It is ok
* for release and report requests. */
if (!lqe_gl->lqe_glbl_data &&
- (req_is_rel(qb_flags) || req_has_rep(qb_flags)))
+ (req_is_rel(qb_flags) || req_has_rep(qb_flags))) {
return;
+ }
- if (need_notify) {
+ mutex_lock(&lqe_gl->lqe_glbl_data_lock);
+ if (lqe_gl->lqe_glbl_data)
qmt_seed_glbe(env, lqe_gl->lqe_glbl_data);
- qmt_id_lock_notify(qmt, lqe_gl);
- }
+ mutex_unlock(&lqe_gl->lqe_glbl_data_lock);
+
+ qmt_id_lock_notify(qmt, lqe_gl);
}
void qti_lqes_init(const struct lu_env *env)
{
struct qmt_thread_info *qti = qmt_info(env);
- if (qti->qti_lqes_cnt > qti->qti_lqes_num) {
+ if (qti->qti_lqes_cnt >= qti->qti_lqes_num) {
struct lquota_entry **lqes;
lqes = qti->qti_lqes;
OBD_ALLOC(lqes, sizeof(lqe) * qti->qti_lqes_num * 2);
if (qti->qti_lqes_num > QMT_MAX_POOL_NUM)
OBD_FREE(qti->qti_lqes,
qti->qti_lqes_num * sizeof(struct lquota_entry *));
+
+ qti->qti_lqes_num = 0;
+ qti->qti_lqes_cnt = 0;
}
-int qti_lqes_min_qunit(const struct lu_env *env)
+__u64 qti_lqes_min_qunit(const struct lu_env *env)
{
- int i, min, qunit;
+ __u64 min, qunit;
+ int i;
for (i = 1, min = qti_lqe_qunit(env, 0); i < qti_lqes_cnt(env); i++) {
qunit = qti_lqe_qunit(env, i);
- if (qunit < min)
+ /* if qunit is 0, lqe is not enforced and we can ignore it */
+ if (qunit && qunit < min)
min = qunit;
}
{
int rc = 0;
- if (qti_lqes_cnt(env) > QMT_MAX_POOL_NUM) {
+ if (qti_lqes_inited(env) && qti_lqes_cnt(env) > QMT_MAX_POOL_NUM) {
OBD_ALLOC(qmt_info(env)->qti_lqes_rstr,
qti_lqes_cnt(env) * sizeof(struct qmt_lqe_restore));
if (!qti_lqes_rstr(env))
void qti_lqes_restore_fini(const struct lu_env *env)
{
- if (qti_lqes_cnt(env) > QMT_MAX_POOL_NUM)
+ if (qti_lqes_inited(env) && qti_lqes_cnt(env) > QMT_MAX_POOL_NUM)
OBD_FREE(qmt_info(env)->qti_lqes_rstr,
qti_lqes_cnt(env) * sizeof(struct qmt_lqe_restore));
}
int i, j, idx;
ENTRY;
+ if (!qti_lqes_cnt(env))
+ RETURN_EXIT;
/* lqes array is sorted by qunit - the first entry has minimum qunit.
* Thus start seeding global qunit's array beginning from the 1st lqe
* and appropriate pool. If pools overlapped, slaves from this