#define DEBUG_SUBSYSTEM S_LQUOTA
#include <linux/kthread.h>
+#include <linux/workqueue.h>
#include <lustre_dlm.h>
#include <lustre_swab.h>
#include "qmt_internal.h"
+struct workqueue_struct *qmt_lvbo_free_wq;
+
/* intent policy function called from mdt_intent_opc() when the intent is of
* quota type */
int qmt_intent_policy(const struct lu_env *env, struct lu_device *ld,
case IT_QUOTA_DQACQ: {
struct lquota_entry *lqe;
struct ldlm_lock *lock;
- int idx;
+ int idx, stype;
if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] == 0)
/* acquire on global lock? something is wrong ... */
GOTO(out, rc = -ENOLCK);
LDLM_LOCK_PUT(lock);
- rc = qmt_uuid2idx(uuid, &idx);
- if (rc < 0)
+ stype = qmt_uuid2idx(uuid, &idx);
+ if (stype < 0)
GOTO(out, rc = -EINVAL);
/* TODO: it seems we don't need to get lqe from
LASSERT(lqe != NULL);
lqe_getref(lqe);
- rc = qmt_pool_lqes_lookup(env, qmt, lqe_rtype(lqe), rc,
+ rc = qmt_pool_lqes_lookup(env, qmt, lqe_rtype(lqe), stype,
lqe_qtype(lqe), &reqbody->qb_id,
NULL, idx);
if (rc) {
/* acquire quota space */
rc = qmt_dqacq0(env, qmt, uuid,
reqbody->qb_flags, reqbody->qb_count,
- reqbody->qb_usage, repbody);
+ reqbody->qb_usage, repbody,
+ qmt_dom(lqe_rtype(lqe), stype) ? -1 : idx);
lqe_putref(lqe);
qti_lqes_fini(env);
if (rc)
*/
static bool qmt_clear_lgeg_arr_nu(struct lquota_entry *lqe, int stype, int idx)
{
- unsigned long least_qunit = lqe2qpi(lqe)->qpi_least_qunit;
- struct lqe_glbl_data *lgd = lqe->lqe_glbl_data;
+ unsigned long least = lqe2qpi(lqe)->qpi_least_qunit;
+ bool revoke = false;
/* There is no array to store lge for the case of DOM.
- * Ignore it until MDT pools will be ready. */
- if (!(lqe_rtype(lqe) == LQUOTA_RES_DT && stype == QMT_STYPE_MDT)) {
- lqe->lqe_glbl_data->lqeg_arr[idx].lge_qunit_nu = 0;
- lqe->lqe_glbl_data->lqeg_arr[idx].lge_edquot_nu = 0;
-
- /* We shouldn't call revoke for DOM case, it will be updated
- * at qmt_id_lock_glimpse. */
- return (lgd->lqeg_arr[idx].lge_qunit == least_qunit);
+ * Ignore it until MDT pools will be ready.
+ */
+ if (!qmt_dom(lqe_rtype(lqe), stype)) {
+ struct lqe_glbl_data *lgd;
+
+ mutex_lock(&lqe->lqe_glbl_data_lock);
+ lgd = lqe->lqe_glbl_data;
+ if (lgd) {
+ int lge_idx = qmt_map_lge_idx(lgd, idx);
+
+ lgd->lqeg_arr[lge_idx].lge_qunit_nu = 0;
+ lgd->lqeg_arr[lge_idx].lge_edquot_nu = 0;
+ /* We shouldn't call revoke for DOM case, it will be
+ * updated at qmt_id_lock_glimpse.
+ */
+ revoke = lgd->lqeg_arr[lge_idx].lge_qunit == least;
+ }
+ mutex_unlock(&lqe->lqe_glbl_data_lock);
}
- return false;
+ return revoke;
}
-static void qmt_set_revoke(struct lu_env *env, struct lquota_entry *lqe,
+static bool qmt_set_revoke(struct lu_env *env, struct lquota_entry *lqe_gl,
int stype, int idx)
{
- unsigned long least_qunit = lqe2qpi(lqe)->qpi_least_qunit;
- struct lqe_glbl_data *lgd = lqe->lqe_glbl_data;
-
- if (lgd->lqeg_arr[idx].lge_qunit == least_qunit) {
- int i;
-
- qti_lqes_write_lock(env);
- for (i = 0; i < qti_lqes_cnt(env); i++) {
- LQUOTA_DEBUG(qti_lqes(env)[i],
- "idx %d lge_qunit %llu least_qunit %lu\n",
- idx, lgd->lqeg_arr[idx].lge_qunit,
- least_qunit);
- if (qti_lqes(env)[i]->lqe_qunit == least_qunit) {
- qti_lqes(env)[i]->lqe_revoke_time =
+ unsigned long least_qunit = lqe2qpi(lqe_gl)->qpi_least_qunit;
+ bool notify = false;
+
+ if (qmt_dom(lqe_rtype(lqe_gl), stype))
+ return false;
+
+ qti_lqes_write_lock(env);
+ mutex_lock(&lqe_gl->lqe_glbl_data_lock);
+ if (lqe_gl->lqe_glbl_data) {
+ struct lqe_glbl_data *lgd = lqe_gl->lqe_glbl_data;
+ int lge_idx;
+
+ lge_idx = qmt_map_lge_idx(lgd, idx);
+ if (lgd->lqeg_arr[lge_idx].lge_qunit == least_qunit) {
+ struct lquota_entry *lqe;
+ int i;
+
+ for (i = 0; i < qti_lqes_cnt(env); i++) {
+ lqe = qti_lqes(env)[i];
+ LQUOTA_DEBUG(lqe,
+ "lge_qunit %llu least_qunit %lu idx %d\n",
+ lgd->lqeg_arr[lge_idx].lge_qunit,
+ least_qunit, idx);
+ if (lqe->lqe_qunit == least_qunit) {
+ lqe->lqe_revoke_time =
ktime_get_seconds();
- qmt_adjust_edquot(qti_lqes(env)[i],
+ notify |= qmt_adjust_edquot(lqe,
ktime_get_real_seconds());
+ }
}
}
- qti_lqes_write_unlock(env);
}
+ mutex_unlock(&lqe_gl->lqe_glbl_data_lock);
+ qti_lqes_write_unlock(env);
+
+ return notify;
}
/*
struct ldlm_lock *lock;
struct obd_export *exp;
bool need_revoke;
- int rc = 0, idx;
+ int rc = 0, idx, stype;
ENTRY;
LASSERT(res != NULL);
GOTO(out, rc = -EFAULT);
}
- rc = qmt_uuid2idx(&exp->exp_client_uuid, &idx);
- if (rc < 0)
- GOTO(out_exp, rc);
+ stype = qmt_uuid2idx(&exp->exp_client_uuid, &idx);
+ if (stype < 0)
+ GOTO(out_exp, rc = stype);
- need_revoke = qmt_clear_lgeg_arr_nu(lqe, rc, idx);
+ need_revoke = qmt_clear_lgeg_arr_nu(lqe, stype, idx);
if (lvb->lvb_id_rel == 0) {
/* nothing to release */
if (lvb->lvb_id_may_rel != 0)
if (!need_revoke && lvb->lvb_id_rel == 0)
GOTO(out_exp, rc = 0);
- rc = qmt_pool_lqes_lookup(env, qmt, lqe_rtype(lqe), rc, lqe_qtype(lqe),
- &lqe->lqe_id, NULL, idx);
+ rc = qmt_pool_lqes_lookup(env, qmt, lqe_rtype(lqe), stype,
+ lqe_qtype(lqe), &lqe->lqe_id, NULL, idx);
if (rc)
GOTO(out_exp, rc);
- if (need_revoke)
- qmt_set_revoke(env, lqe, rc, idx);
+ if (need_revoke && qmt_set_revoke(env, lqe, stype, idx)) {
+ int notify = false;
+
+ mutex_lock(&lqe->lqe_glbl_data_lock);
+ if (lqe->lqe_glbl_data) {
+ qmt_seed_glbe_edquot(env, lqe->lqe_glbl_data);
+ notify = true;
+ }
+ mutex_unlock(&lqe->lqe_glbl_data_lock);
+ if (notify)
+ qmt_id_lock_notify(qmt, lqe);
+ }
if (lvb->lvb_id_rel) {
LQUOTA_DEBUG(lqe, "releasing:%llu may release:%llu",
/* release quota space */
rc = qmt_dqacq0(env, qmt, &exp->exp_client_uuid,
QUOTA_DQACQ_FL_REL, lvb->lvb_id_rel,
- 0, &qti->qti_body);
+ 0, &qti->qti_body,
+ qmt_dom(lqe_rtype(lqe), stype) ? -1 : idx);
if (rc || qti->qti_body.qb_count != lvb->lvb_id_rel)
LQUOTA_ERROR(lqe,
"failed to release quota space on glimpse %llu!=%llu : rc = %d\n",
* we are thus dealing with an ID lock. */
struct lquota_entry *lqe = res->lr_lvb_data;
struct qmt_device *qmt;
- struct obd_uuid *uuid;
+ struct obd_uuid *uuid;
int idx;
uuid = &(lock)->l_export->exp_client_uuid;
lqe_putref(lqe);
} else {
/* global quota lock */
- struct dt_object *obj = res->lr_lvb_data;
+ struct dt_object *obj = res->lr_lvb_data;
/* return current version of global index */
qlvb->lvb_glb_ver = dt_version_get(env, obj);
if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] != 0) {
struct lquota_entry *lqe = res->lr_lvb_data;
- struct lqe_glbl_data *lgd = lqe->lqe_glbl_data;
- /* release lqe reference */
- lqe->lqe_glbl_data = NULL;
- lqe_putref(lqe);
- qmt_free_lqe_gd(lgd);
+ queue_work(qmt_lvbo_free_wq, &lqe->lqe_work);
} else {
struct dt_object *obj = res->lr_lvb_data;
/* release object reference */
array->q_locks[i] = NULL;
}
array->q_cnt = 0;
- OBD_FREE(array->q_locks, array->q_max * sizeof(*array->q_locks));
+ OBD_FREE_PTR_ARRAY(array->q_locks, array->q_max);
array->q_locks = NULL;
array->q_max = 0;
}
struct qmt_gl_lock_array *array,
qmt_glimpse_cb_t cb, void *arg)
{
+ struct lquota_entry *lqe = arg;
struct list_head *pos;
unsigned long count = 0;
int fail_cnt = 0;
LASSERT(!array->q_max && !array->q_cnt && !array->q_locks);
again:
+ if (cb)
+ mutex_lock(&lqe->lqe_glbl_data_lock);
lock_res(res);
/* scan list of granted locks */
list_for_each(pos, &res->lr_granted) {
}
}
unlock_res(res);
+ if (cb)
+ mutex_unlock(&lqe->lqe_glbl_data_lock);
if (count > array->q_max) {
qmt_free_lock_array(array);
array->q_max = count + count / 2 + 10;
count = 0;
LASSERT(array->q_locks == NULL && array->q_cnt == 0);
- OBD_ALLOC(array->q_locks,
- sizeof(*array->q_locks) * array->q_max);
+ OBD_ALLOC_PTR_ARRAY(array->q_locks, array->q_max);
if (array->q_locks == NULL) {
array->q_max = 0;
RETURN(-ENOMEM);
RETURN(0);
}
-void qmt_setup_id_desc(struct ldlm_lock *lock, union ldlm_gl_desc *desc,
- struct lquota_entry *lqe)
+static void qmt_setup_id_desc(struct ldlm_lock *lock, union ldlm_gl_desc *desc,
+ struct lquota_entry *lqe)
{
struct obd_uuid *uuid = &(lock)->l_export->exp_client_uuid;
- struct lqe_glbl_data *lgd = lqe->lqe_glbl_data;
int idx, stype;
__u64 qunit;
bool edquot;
LASSERT(stype >= 0);
/* DOM case - set global lqe settings */
- if (lqe_rtype(lqe) == LQUOTA_RES_DT && stype == QMT_STYPE_MDT) {
+ if (qmt_dom(lqe_rtype(lqe), stype)) {
edquot = lqe->lqe_edquot;
qunit = lqe->lqe_qunit;
} else {
- edquot = lgd->lqeg_arr[idx].lge_edquot;
- qunit = lgd->lqeg_arr[idx].lge_qunit;
+ struct lqe_glbl_data *lgd;
+ int lge_idx;
+
+ mutex_lock(&lqe->lqe_glbl_data_lock);
+ lgd = lqe->lqe_glbl_data;
+ if (lgd) {
+ lge_idx = qmt_map_lge_idx(lgd, idx);
+ edquot = lgd->lqeg_arr[lge_idx].lge_edquot;
+ qunit = lgd->lqeg_arr[lge_idx].lge_qunit;
+ } else {
+ edquot = lqe->lqe_edquot;
+ qunit = lqe->lqe_qunit;
+ }
+ mutex_unlock(&lqe->lqe_glbl_data_lock);
}
/* fill glimpse descriptor with lqe settings */
qmt_glimpse_cb_t cb, struct lquota_entry *lqe)
{
union ldlm_gl_desc *descs = NULL;
- struct lqe_glbl_data *gld;
struct list_head *tmp, *pos;
LIST_HEAD(gl_list);
struct qmt_gl_lock_array locks;
int rc = 0;
ENTRY;
- gld = lqe ? lqe->lqe_glbl_data : NULL;
memset(&locks, 0, sizeof(locks));
rc = qmt_alloc_lock_array(res, &locks, cb, lqe);
if (rc) {
locks_count = locks.q_cnt;
/* Use one desc for all works, when called from qmt_glb_lock_notify */
- if (gld && locks.q_cnt > 1) {
+ if (cb && locks.q_cnt > 1) {
/* TODO: think about to store this preallocated descs
* in lqe_global in lqeg_arr as a part of lqe_glbl_entry.
* The benefit is that we don't need to allocate/free
continue;
}
- if (gld) {
+ if (cb) {
if (descs)
desc = &descs[i - 1];
qmt_setup_id_desc(locks.q_locks[i - 1], desc, lqe);
qti->qti_gl_desc.lquota_desc.gl_time = LQUOTA_GRACE_FLAG(0,
LQUOTA_FLAG_DEFAULT);
+ } else if (lqe->lqe_is_deleted) {
+ qti->qti_gl_desc.lquota_desc.gl_hardlimit = 0;
+ qti->qti_gl_desc.lquota_desc.gl_softlimit = 0;
+ qti->qti_gl_desc.lquota_desc.gl_time = LQUOTA_GRACE_FLAG(0,
+ LQUOTA_FLAG_DELETED);
+ } else if (lqe->lqe_is_reset) {
+ qti->qti_gl_desc.lquota_desc.gl_hardlimit = lqe->lqe_hardlimit;
+ qti->qti_gl_desc.lquota_desc.gl_softlimit = lqe->lqe_softlimit;
+ qti->qti_gl_desc.lquota_desc.gl_time = LQUOTA_GRACE_FLAG(0,
+ LQUOTA_FLAG_RESET);
+ } else if (lqe->lqe_granted > lqe->lqe_hardlimit) {
+ qti->qti_gl_desc.lquota_desc.gl_hardlimit = lqe->lqe_hardlimit;
+ qti->qti_gl_desc.lquota_desc.gl_softlimit = lqe->lqe_softlimit;
+ qti->qti_gl_desc.lquota_desc.gl_time = LQUOTA_GRACE_FLAG(0,
+ LQUOTA_FLAG_REVOKE);
} else {
qti->qti_gl_desc.lquota_desc.gl_hardlimit = lqe->lqe_hardlimit;
qti->qti_gl_desc.lquota_desc.gl_softlimit = lqe->lqe_softlimit;
/* look up ldlm resource associated with global index */
fid_build_reg_res_name(&qti->qti_fid, &qti->qti_resid);
- res = ldlm_resource_get(pool->qpi_qmt->qmt_ns, NULL, &qti->qti_resid,
+ res = ldlm_resource_get(pool->qpi_qmt->qmt_ns, &qti->qti_resid,
LDLM_PLAIN, 0);
if (IS_ERR(res)) {
/* this might happen if no slaves have enqueued global quota
* broadcasting the new qunit value */
static int qmt_id_lock_cb(struct ldlm_lock *lock, struct lquota_entry *lqe)
{
- struct obd_uuid *uuid = &(lock)->l_export->exp_client_uuid;
+ struct obd_uuid *uuid = &(lock)->l_export->exp_client_uuid;
struct lqe_glbl_data *lgd = lqe->lqe_glbl_data;
int idx;
int stype = qmt_uuid2idx(uuid, &idx);
LASSERT(stype == QMT_STYPE_OST || stype == QMT_STYPE_MDT);
+ CDEBUG(D_QUOTA, "stype %d rtype %d idx %d uuid %s\n",
+ stype, lqe_rtype(lqe), idx, uuid->uuid);
/* Quota pools support only OSTs, despite MDTs also could be registered
* as LQUOTA_RES_DT devices(DOM). */
- if (lqe_rtype(lqe) == LQUOTA_RES_DT && stype == QMT_STYPE_MDT)
+ if (qmt_dom(lqe_rtype(lqe), stype))
return 1;
- else
- return lgd->lqeg_arr[idx].lge_edquot_nu ||
- lgd->lqeg_arr[idx].lge_qunit_nu;
+
+ if (lgd) {
+ int lge_idx = qmt_map_lge_idx(lgd, idx);
+
+ CDEBUG(D_QUOTA,
+ "tgt idx:%d lge_idx:%d edquot_nu:%d qunit_nu:%d\n",
+ idx, lge_idx, lgd->lqeg_arr[lge_idx].lge_edquot_nu,
+ lgd->lqeg_arr[lge_idx].lge_qunit_nu);
+ return lgd->lqeg_arr[lge_idx].lge_edquot_nu ||
+ lgd->lqeg_arr[lge_idx].lge_qunit_nu;
+ }
+
+ return 0;
}
lquota_generate_fid(&qti->qti_fid, pool->qpi_rtype, lqe_qtype(lqe));
fid_build_quota_res_name(&qti->qti_fid, &lqe->lqe_id, &qti->qti_resid);
- res = ldlm_resource_get(qmt->qmt_ns, NULL, &qti->qti_resid, LDLM_PLAIN,
- 0);
+ res = ldlm_resource_get(qmt->qmt_ns, &qti->qti_resid, LDLM_PLAIN, 0);
if (IS_ERR(res)) {
/* this might legitimately happens if slaves haven't had the
* opportunity to enqueue quota lock yet. */
}
lqe_write_lock(lqe);
+ /*
+ * It is possible to add an lqe in a 2nd time while the same lqe
+ * from the 1st time is still sending glimpse
+ */
+ if (lqe->lqe_gl)
+ GOTO(out, 0);
/* The purpose of glimpse callback on per-ID lock is twofold:
* - notify slaves of new qunit value and hope they will release some
* spare quota space in return
* replies if needed */
lqe->lqe_may_rel = 0;
- /* The rebalance thread is the only thread which can issue glimpses */
- LASSERT(!lqe->lqe_gl);
lqe->lqe_gl = true;
lqe_write_unlock(lqe);
/* issue glimpse callback to slaves */
- qmt_glimpse_lock(env, qmt, res, &qti->qti_gl_desc,
- qmt_id_lock_cb, lqe);
+ if (lqe->lqe_glbl_data)
+ qmt_glimpse_lock(env, qmt, res, &qti->qti_gl_desc,
+ qmt_id_lock_cb, lqe);
lqe_write_lock(lqe);
if (lqe->lqe_revoke_time == 0 &&
}
LASSERT(lqe->lqe_gl);
lqe->lqe_gl = false;
+out:
lqe_write_unlock(lqe);
-
ldlm_resource_putref(res);
EXIT;
}
list_del_init(&lqe->lqe_link);
spin_unlock(&qmt->qmt_reba_lock);
- if (!kthread_should_stop())
+ /* lqe_ref == 1 means we hold the last ref,
+ * so no need to send glimpse callbacks.
+ */
+ if (!kthread_should_stop() &&
+ atomic_read(&lqe->lqe_ref) > 1)
qmt_id_lock_glimpse(env, qmt, lqe, NULL);
lqe_putref(lqe);
rc = lu_env_init(&args->qra_env, LCT_MD_THREAD);
if (rc) {
- CERROR("%s: failed to init env.", qmt->qmt_svname);
+ CERROR("%s: failed to init env.\n", qmt->qmt_svname);
GOTO(out_env, rc);
}