case IT_QUOTA_DQACQ: {
struct lquota_entry *lqe;
struct ldlm_lock *lock;
- int idx;
+ int idx, stype;
if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] == 0)
/* acquire on global lock? something is wrong ... */
GOTO(out, rc = -ENOLCK);
LDLM_LOCK_PUT(lock);
- rc = qmt_uuid2idx(uuid, &idx);
- if (rc < 0)
+ stype = qmt_uuid2idx(uuid, &idx);
+ if (stype < 0)
GOTO(out, rc = -EINVAL);
/* TODO: it seems we don't need to get lqe from
LASSERT(lqe != NULL);
lqe_getref(lqe);
- rc = qmt_pool_lqes_lookup(env, qmt, lqe_rtype(lqe), rc,
+ rc = qmt_pool_lqes_lookup(env, qmt, lqe_rtype(lqe), stype,
lqe_qtype(lqe), &reqbody->qb_id,
NULL, idx);
if (rc) {
/* acquire quota space */
rc = qmt_dqacq0(env, qmt, uuid,
reqbody->qb_flags, reqbody->qb_count,
- reqbody->qb_usage, repbody);
+ reqbody->qb_usage, repbody,
+ qmt_dom(lqe_rtype(lqe), stype) ? -1 : idx);
lqe_putref(lqe);
qti_lqes_fini(env);
if (rc)
/* There is no array to store lge for the case of DOM.
* Ignore it until MDT pools will be ready. */
- if (!(lqe_rtype(lqe) == LQUOTA_RES_DT && stype == QMT_STYPE_MDT)) {
+ if (!qmt_dom(lqe_rtype(lqe), stype)) {
lqe->lqe_glbl_data->lqeg_arr[idx].lge_qunit_nu = 0;
lqe->lqe_glbl_data->lqeg_arr[idx].lge_edquot_nu = 0;
struct ldlm_lock *lock;
struct obd_export *exp;
bool need_revoke;
- int rc = 0, idx;
+ int rc = 0, idx, stype;
ENTRY;
LASSERT(res != NULL);
GOTO(out, rc = -EFAULT);
}
- rc = qmt_uuid2idx(&exp->exp_client_uuid, &idx);
- if (rc < 0)
- GOTO(out_exp, rc);
+ stype = qmt_uuid2idx(&exp->exp_client_uuid, &idx);
+ if (stype < 0)
+ GOTO(out_exp, rc = stype);
- need_revoke = qmt_clear_lgeg_arr_nu(lqe, rc, idx);
+ need_revoke = qmt_clear_lgeg_arr_nu(lqe, stype, idx);
if (lvb->lvb_id_rel == 0) {
/* nothing to release */
if (lvb->lvb_id_may_rel != 0)
if (!need_revoke && lvb->lvb_id_rel == 0)
GOTO(out_exp, rc = 0);
- rc = qmt_pool_lqes_lookup(env, qmt, lqe_rtype(lqe), rc, lqe_qtype(lqe),
- &lqe->lqe_id, NULL, idx);
+ rc = qmt_pool_lqes_lookup(env, qmt, lqe_rtype(lqe), stype,
+ lqe_qtype(lqe), &lqe->lqe_id, NULL, idx);
if (rc)
GOTO(out_exp, rc);
- if (need_revoke && qmt_set_revoke(env, lqe, rc, idx) &&
- lqe->lqe_glbl_data) {
- qmt_seed_glbe_edquot(env, lqe->lqe_glbl_data);
- qmt_id_lock_notify(qmt, lqe);
+ if (need_revoke && qmt_set_revoke(env, lqe, stype, idx)) {
+ mutex_lock(&lqe->lqe_glbl_data_lock);
+ if (lqe->lqe_glbl_data) {
+ qmt_seed_glbe_edquot(env, lqe->lqe_glbl_data);
+ qmt_id_lock_notify(qmt, lqe);
+ }
+ mutex_unlock(&lqe->lqe_glbl_data_lock);
}
if (lvb->lvb_id_rel) {
/* release quota space */
rc = qmt_dqacq0(env, qmt, &exp->exp_client_uuid,
QUOTA_DQACQ_FL_REL, lvb->lvb_id_rel,
- 0, &qti->qti_body);
+ 0, &qti->qti_body,
+ qmt_dom(lqe_rtype(lqe), stype) ? -1 : idx);
if (rc || qti->qti_body.qb_count != lvb->lvb_id_rel)
LQUOTA_ERROR(lqe,
"failed to release quota space on glimpse %llu!=%llu : rc = %d\n",
if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] != 0) {
struct lquota_entry *lqe = res->lr_lvb_data;
- struct lqe_glbl_data *lgd = lqe->lqe_glbl_data;
+ struct lqe_glbl_data *lgd;
mutex_lock(&lqe->lqe_glbl_data_lock);
+ lgd = lqe->lqe_glbl_data;
lqe->lqe_glbl_data = NULL;
- qmt_free_lqe_gd(lgd);
mutex_unlock(&lqe->lqe_glbl_data_lock);
+ qmt_free_lqe_gd(lgd);
/* release lqe reference */
lqe_putref(lqe);
struct qmt_gl_lock_array *array,
qmt_glimpse_cb_t cb, void *arg)
{
+ struct lquota_entry *lqe = arg;
struct list_head *pos;
unsigned long count = 0;
int fail_cnt = 0;
LASSERT(!array->q_max && !array->q_cnt && !array->q_locks);
again:
+ if (cb)
+ mutex_lock(&lqe->lqe_glbl_data_lock);
lock_res(res);
/* scan list of granted locks */
list_for_each(pos, &res->lr_granted) {
}
}
unlock_res(res);
+ if (cb)
+ mutex_unlock(&lqe->lqe_glbl_data_lock);
if (count > array->q_max) {
qmt_free_lock_array(array);
RETURN(0);
}
-void qmt_setup_id_desc(struct ldlm_lock *lock, union ldlm_gl_desc *desc,
- struct lquota_entry *lqe)
+static void qmt_setup_id_desc(struct ldlm_lock *lock, union ldlm_gl_desc *desc,
+ struct lquota_entry *lqe)
{
struct obd_uuid *uuid = &(lock)->l_export->exp_client_uuid;
- struct lqe_glbl_data *lgd = lqe->lqe_glbl_data;
int idx, stype;
__u64 qunit;
bool edquot;
LASSERT(stype >= 0);
/* DOM case - set global lqe settings */
- if (lqe_rtype(lqe) == LQUOTA_RES_DT && stype == QMT_STYPE_MDT) {
+ if (qmt_dom(lqe_rtype(lqe), stype)) {
edquot = lqe->lqe_edquot;
qunit = lqe->lqe_qunit;
} else {
- edquot = lgd->lqeg_arr[idx].lge_edquot;
- qunit = lgd->lqeg_arr[idx].lge_qunit;
+ struct lqe_glbl_data *lgd;
+
+ mutex_lock(&lqe->lqe_glbl_data_lock);
+ lgd = lqe->lqe_glbl_data;
+ if (lgd) {
+ edquot = lgd->lqeg_arr[idx].lge_edquot;
+ qunit = lgd->lqeg_arr[idx].lge_qunit;
+ } else {
+ edquot = lqe->lqe_edquot;
+ qunit = lqe->lqe_qunit;
+ }
+ mutex_unlock(&lqe->lqe_glbl_data_lock);
}
/* fill glimpse descriptor with lqe settings */
qmt_glimpse_cb_t cb, struct lquota_entry *lqe)
{
union ldlm_gl_desc *descs = NULL;
- struct lqe_glbl_data *gld;
struct list_head *tmp, *pos;
LIST_HEAD(gl_list);
struct qmt_gl_lock_array locks;
int rc = 0;
ENTRY;
- gld = lqe ? lqe->lqe_glbl_data : NULL;
memset(&locks, 0, sizeof(locks));
rc = qmt_alloc_lock_array(res, &locks, cb, lqe);
if (rc) {
locks_count = locks.q_cnt;
/* Use one desc for all works, when called from qmt_glb_lock_notify */
- if (gld && locks.q_cnt > 1) {
+ if (cb && locks.q_cnt > 1) {
/* TODO: think about to store this preallocated descs
* in lqe_global in lqeg_arr as a part of lqe_glbl_entry.
* The benefit is that we don't need to allocate/free
continue;
}
- if (gld) {
+ if (cb) {
if (descs)
desc = &descs[i - 1];
qmt_setup_id_desc(locks.q_locks[i - 1], desc, lqe);
qti->qti_gl_desc.lquota_desc.gl_softlimit = 0;
qti->qti_gl_desc.lquota_desc.gl_time = LQUOTA_GRACE_FLAG(0,
LQUOTA_FLAG_DELETED);
+ } else if (lqe->lqe_is_reset) {
+ qti->qti_gl_desc.lquota_desc.gl_hardlimit = lqe->lqe_hardlimit;
+ qti->qti_gl_desc.lquota_desc.gl_softlimit = lqe->lqe_softlimit;
+ qti->qti_gl_desc.lquota_desc.gl_time = LQUOTA_GRACE_FLAG(0,
+ LQUOTA_FLAG_RESET);
} else {
qti->qti_gl_desc.lquota_desc.gl_hardlimit = lqe->lqe_hardlimit;
qti->qti_gl_desc.lquota_desc.gl_softlimit = lqe->lqe_softlimit;
/* Quota pools support only OSTs, despite MDTs also could be registered
* as LQUOTA_RES_DT devices(DOM). */
- if (lqe_rtype(lqe) == LQUOTA_RES_DT && stype == QMT_STYPE_MDT)
+ if (qmt_dom(lqe_rtype(lqe), stype))
return 1;
else
- return lgd->lqeg_arr[idx].lge_edquot_nu ||
- lgd->lqeg_arr[idx].lge_qunit_nu;
+ return lgd ? lgd->lqeg_arr[idx].lge_edquot_nu ||
+ lgd->lqeg_arr[idx].lge_qunit_nu : 0;
}
}
lqe_write_lock(lqe);
+ /*
+ * It is possible to add an lqe in a 2nd time while the same lqe
+ * from the 1st time is still sending glimpse
+ */
+ if (lqe->lqe_gl)
+ GOTO(out, 0);
/* The purpose of glimpse callback on per-ID lock is twofold:
* - notify slaves of new qunit value and hope they will release some
* spare quota space in return
* replies if needed */
lqe->lqe_may_rel = 0;
- /* The rebalance thread is the only thread which can issue glimpses */
- LASSERT(!lqe->lqe_gl);
lqe->lqe_gl = true;
lqe_write_unlock(lqe);
/* issue glimpse callback to slaves */
- qmt_glimpse_lock(env, qmt, res, &qti->qti_gl_desc,
- qmt_id_lock_cb, lqe);
+ if (lqe->lqe_glbl_data)
+ qmt_glimpse_lock(env, qmt, res, &qti->qti_gl_desc,
+ qmt_id_lock_cb, lqe);
lqe_write_lock(lqe);
if (lqe->lqe_revoke_time == 0 &&
}
LASSERT(lqe->lqe_gl);
lqe->lqe_gl = false;
+out:
lqe_write_unlock(lqe);
-
ldlm_resource_putref(res);
EXIT;
}
list_del_init(&lqe->lqe_link);
spin_unlock(&qmt->qmt_reba_lock);
- if (!kthread_should_stop())
+ /* lqe_ref == 1 means we hold the last ref,
+ * so no need to send glimpse callbacks.
+ */
+ if (!kthread_should_stop() &&
+ atomic_read(&lqe->lqe_ref) > 1)
qmt_id_lock_glimpse(env, qmt, lqe, NULL);
lqe_putref(lqe);