case IT_QUOTA_DQACQ: {
struct lquota_entry *lqe;
struct ldlm_lock *lock;
+ int idx;
if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] == 0)
/* acquire on global lock? something is wrong ... */
GOTO(out, rc = -ENOLCK);
LDLM_LOCK_PUT(lock);
+ rc = qmt_uuid2idx(uuid, &idx);
+ if (rc < 0)
+ GOTO(out, rc = -EINVAL);
+
+ /* TODO: it seems we don't need to get lqe from
+ * lq_lvb_data anymore ... And do extra get
+ * and put on it */
lqe = res->lr_lvb_data;
LASSERT(lqe != NULL);
lqe_getref(lqe);
+ rc = qmt_pool_lqes_lookup(env, qmt, lqe_rtype(lqe), rc,
+ lqe_qtype(lqe), &reqbody->qb_id,
+ NULL, idx);
+ if (rc) {
+ lqe_putref(lqe);
+ GOTO(out, rc);
+ }
+
/* acquire quota space */
- rc = qmt_dqacq0(env, lqe, qmt, uuid, reqbody->qb_flags,
- reqbody->qb_count, reqbody->qb_usage,
- repbody);
+ rc = qmt_dqacq0(env, qmt, uuid,
+ reqbody->qb_flags, reqbody->qb_count,
+ reqbody->qb_usage, repbody);
lqe_putref(lqe);
+ qti_lqes_fini(env);
if (rc)
GOTO(out, rc);
break;
/* on success, pack lvb in reply */
lvb = req_capsule_server_get(&req->rq_pill, &RMF_DLM_LVB);
lvb_len = ldlm_lvbo_size(*lockp);
- lvb_len = ldlm_lvbo_fill(*lockp, lvb, lvb_len);
+ lvb_len = ldlm_lvbo_fill(*lockp, lvb, &lvb_len);
if (lvb_len < 0)
GOTO(out, rc = lvb_len);
struct lu_env *env;
struct qmt_thread_info *qti;
struct qmt_device *qmt = lu2qmt_dev(ld);
- int pool_id, pool_type, qtype;
+ int pool_type, qtype;
int rc;
ENTRY;
res->lr_name.name[LUSTRE_RES_ID_SEQ_OFF] != FID_SEQ_QUOTA_GLB)
RETURN(0);
- OBD_ALLOC_PTR(env);
- if (env == NULL)
- RETURN(-ENOMEM);
-
- /* initialize environment */
- rc = lu_env_init(env, LCT_MD_THREAD);
- if (rc != 0)
- GOTO(out_free, rc);
+ env = lu_env_find();
+ LASSERT(env);
qti = qmt_info(env);
/* extract global index FID and quota identifier */
fid_extract_from_quota_res(&qti->qti_fid, &qti->qti_id, &res->lr_name);
/* sanity check the global index FID */
- rc = lquota_extract_fid(&qti->qti_fid, &pool_id, &pool_type, &qtype);
+ rc = lquota_extract_fid(&qti->qti_fid, &pool_type, &qtype);
if (rc) {
- CERROR("can't extract pool information from FID "DFID"\n",
+ CERROR("can't extract glb index information from FID "DFID"\n",
PFID(&qti->qti_fid));
GOTO(out, rc);
}
if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] != 0) {
/* no ID quota lock associated with UID/GID 0 or with a seq 0,
* we are thus dealing with an ID lock. */
+ struct qmt_pool_info *pool;
struct lquota_entry *lqe;
+ struct lqe_glbl_data *lgd;
+
+ pool = qmt_pool_lookup_glb(env, qmt, pool_type);
+ if (IS_ERR(pool))
+ GOTO(out, rc = -ENOMEM);
/* Find the quota entry associated with the quota id */
- lqe = qmt_pool_lqe_lookup(env, qmt, pool_id, pool_type, qtype,
- &qti->qti_id);
- if (IS_ERR(lqe))
+ lqe = qmt_pool_lqe_lookup(env, qmt, pool_type, qtype,
+ &qti->qti_id, NULL);
+ if (IS_ERR(lqe)) {
+ qpi_putref(env, pool);
GOTO(out, rc = PTR_ERR(lqe));
+ }
+
+ /* TODO: need something like qmt_extend_lqe_gd that has
+ * to be calledeach time when qpi_slv_nr is incremented */
+ lgd = qmt_alloc_lqe_gd(pool, qtype);
+ if (!lgd) {
+ lqe_putref(lqe);
+ qpi_putref(env, pool);
+ GOTO(out, rc = -ENOMEM);
+ }
+
+ qmt_setup_lqe_gd(env, qmt, lqe, lgd, pool_type);
/* store reference to lqe in lr_lvb_data */
res->lr_lvb_data = lqe;
+ qpi_putref(env, pool);
LQUOTA_DEBUG(lqe, "initialized res lvb");
} else {
struct dt_object *obj;
res->lr_lvb_len = sizeof(struct lquota_lvb);
EXIT;
out:
- lu_env_fini(env);
-out_free:
- OBD_FREE_PTR(env);
return rc;
}
+/* clear lge_qunit/edquot_nu flags -
+ * slave recieved new qunit and edquot.
+ *
+ * \retval true if revoke is needed - qunit
+ * for this slave reaches least_qunit
+ */
+static bool qmt_clear_lgeg_arr_nu(struct lquota_entry *lqe, int stype, int idx)
+{
+ unsigned long least_qunit = lqe2qpi(lqe)->qpi_least_qunit;
+ struct lqe_glbl_data *lgd = lqe->lqe_glbl_data;
+
+ /* There is no array to store lge for the case of DOM.
+ * Ignore it until MDT pools will be ready. */
+ if (!(lqe_rtype(lqe) == LQUOTA_RES_DT && stype == QMT_STYPE_MDT)) {
+ lqe->lqe_glbl_data->lqeg_arr[idx].lge_qunit_nu = 0;
+ lqe->lqe_glbl_data->lqeg_arr[idx].lge_edquot_nu = 0;
+
+ /* We shouldn't call revoke for DOM case, it will be updated
+ * at qmt_id_lock_glimpse. */
+ return (lgd->lqeg_arr[idx].lge_qunit == least_qunit);
+ }
+
+ return false;
+}
+
+static void qmt_set_revoke(struct lu_env *env, struct lquota_entry *lqe,
+ int stype, int idx)
+{
+ unsigned long least_qunit = lqe2qpi(lqe)->qpi_least_qunit;
+ struct lqe_glbl_data *lgd = lqe->lqe_glbl_data;
+
+ if (lgd->lqeg_arr[idx].lge_qunit == least_qunit) {
+ int i;
+
+ qti_lqes_write_lock(env);
+ for (i = 0; i < qti_lqes_cnt(env); i++) {
+ LQUOTA_DEBUG(qti_lqes(env)[i],
+ "idx %d lge_qunit %llu least_qunit %lu\n",
+ idx, lgd->lqeg_arr[idx].lge_qunit,
+ least_qunit);
+ if (qti_lqes(env)[i]->lqe_qunit == least_qunit) {
+ qti_lqes(env)[i]->lqe_revoke_time =
+ ktime_get_seconds();
+ qmt_adjust_edquot(qti_lqes(env)[i],
+ ktime_get_real_seconds());
+ }
+ }
+ qti_lqes_write_unlock(env);
+ }
+}
+
/*
* Update LVB associated with the global quota index.
* This function is called from the DLM itself after a glimpse callback, in this
struct lquota_lvb *lvb;
struct ldlm_lock *lock;
struct obd_export *exp;
- int rc = 0;
+ bool need_revoke;
+ int rc = 0, idx;
ENTRY;
LASSERT(res != NULL);
LASSERT(lqe != NULL);
lqe_getref(lqe);
- LQUOTA_DEBUG(lqe, "releasing:%llu may release:%llu",
- lvb->lvb_id_rel, lvb->lvb_id_may_rel);
-
- if (lvb->lvb_id_rel == 0) {
- /* nothing to release */
- if (lvb->lvb_id_may_rel != 0)
- /* but might still release later ... */
- lqe->lqe_may_rel += lvb->lvb_id_may_rel;
- GOTO(out_lqe, rc = 0);
- }
-
/* allocate environement */
- OBD_ALLOC_PTR(env);
- if (env == NULL)
- GOTO(out_lqe, rc = -ENOMEM);
-
- /* initialize environment */
- rc = lu_env_init(env, LCT_MD_THREAD);
- if (rc)
- GOTO(out_env, rc);
+ env = lu_env_find();
+ LASSERT(env);
qti = qmt_info(env);
/* The request is a glimpse callback which was sent via the
if (IS_ERR(lock)) {
CERROR("%s: failed to get lock from request!\n",
qmt->qmt_svname);
- GOTO(out_env_init, rc = PTR_ERR(lock));
+ GOTO(out, rc = PTR_ERR(lock));
}
exp = class_export_get(lock->l_export);
if (exp == NULL) {
CERROR("%s: failed to get export from lock!\n",
qmt->qmt_svname);
- GOTO(out_env_init, rc = -EFAULT);
+ GOTO(out, rc = -EFAULT);
}
- /* release quota space */
- rc = qmt_dqacq0(env, lqe, qmt, &exp->exp_client_uuid,
- QUOTA_DQACQ_FL_REL, lvb->lvb_id_rel, 0, &qti->qti_body);
- if (rc || qti->qti_body.qb_count != lvb->lvb_id_rel)
- LQUOTA_ERROR(lqe, "failed to release quota space on glimpse "
- "%llu!=%llu : rc = %d\n", qti->qti_body.qb_count,
- lvb->lvb_id_rel, rc);
- class_export_put(exp);
+ rc = qmt_uuid2idx(&exp->exp_client_uuid, &idx);
+ if (rc < 0)
+ GOTO(out_exp, rc);
+
+ need_revoke = qmt_clear_lgeg_arr_nu(lqe, rc, idx);
+ if (lvb->lvb_id_rel == 0) {
+ /* nothing to release */
+ if (lvb->lvb_id_may_rel != 0)
+ /* but might still release later ... */
+ lqe->lqe_may_rel += lvb->lvb_id_may_rel;
+ }
+
+ if (!need_revoke && lvb->lvb_id_rel == 0)
+ GOTO(out_exp, rc = 0);
+
+ rc = qmt_pool_lqes_lookup(env, qmt, lqe_rtype(lqe), rc, lqe_qtype(lqe),
+ &lqe->lqe_id, NULL, idx);
+ if (rc)
+ GOTO(out_exp, rc);
+
+ if (need_revoke)
+ qmt_set_revoke(env, lqe, rc, idx);
+
+ if (lvb->lvb_id_rel) {
+ LQUOTA_DEBUG(lqe, "releasing:%llu may release:%llu",
+ lvb->lvb_id_rel, lvb->lvb_id_may_rel);
+
+ /* release quota space */
+ rc = qmt_dqacq0(env, qmt, &exp->exp_client_uuid,
+ QUOTA_DQACQ_FL_REL, lvb->lvb_id_rel,
+ 0, &qti->qti_body);
+ if (rc || qti->qti_body.qb_count != lvb->lvb_id_rel)
+ LQUOTA_ERROR(lqe,
+ "failed to release quota space on glimpse %llu!=%llu : rc = %d\n",
+ qti->qti_body.qb_count,
+ lvb->lvb_id_rel, rc);
+ }
+ qti_lqes_fini(env);
if (rc)
- GOTO(out_env_init, rc);
+ GOTO(out_exp, rc);
EXIT;
-out_env_init:
- lu_env_fini(env);
-out_env:
- OBD_FREE_PTR(env);
-out_lqe:
+out_exp:
+ class_export_put(exp);
+out:
lqe_putref(lqe);
return rc;
}
int qmt_lvbo_fill(struct lu_device *ld, struct ldlm_lock *lock, void *lvb,
int lvblen)
{
- struct ldlm_resource *res = lock->l_resource;
- struct lquota_lvb *qlvb = lvb;
+ struct ldlm_resource *res = lock->l_resource;
+ struct lquota_lvb *qlvb = lvb;
+ struct lu_env *env;
+ int rc;
ENTRY;
LASSERT(res != NULL);
+ rc = 0;
if (res->lr_type != LDLM_PLAIN || res->lr_lvb_data == NULL ||
res->lr_name.name[LUSTRE_RES_ID_SEQ_OFF] != FID_SEQ_QUOTA_GLB)
RETURN(-EINVAL);
+ env = lu_env_find();
+ LASSERT(env);
+
if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] != 0) {
/* no ID quota lock associated with UID/GID 0 or with a seq 0,
* we are thus dealing with an ID lock. */
- struct lquota_entry *lqe = res->lr_lvb_data;
-
+ struct lquota_entry *lqe = res->lr_lvb_data;
+ struct qmt_device *qmt;
+ struct obd_uuid *uuid;
+ int idx;
+
+ uuid = &(lock)->l_export->exp_client_uuid;
+ rc = qmt_uuid2idx(uuid, &idx);
+ if (rc < 0)
+ RETURN(rc);
+ qmt = lu2qmt_dev(ld);
/* return current qunit value & edquot flags in lvb */
lqe_getref(lqe);
- qlvb->lvb_id_qunit = lqe->lqe_qunit;
- qlvb->lvb_flags = 0;
- if (lqe->lqe_edquot)
- qlvb->lvb_flags = LQUOTA_FL_EDQUOT;
+ rc = qmt_pool_lqes_lookup(env, qmt, lqe_rtype(lqe), rc,
+ lqe_qtype(lqe), &lqe->lqe_id,
+ NULL, idx);
+ if (!rc) {
+ qlvb->lvb_id_qunit = qti_lqes_min_qunit(env);
+ qlvb->lvb_flags = 0;
+ if (qti_lqes_edquot(env))
+ qlvb->lvb_flags = LQUOTA_FL_EDQUOT;
+ qti_lqes_fini(env);
+ }
+ CDEBUG(D_QUOTA, "uuid %s lqe_id %lu, edquot %llu qunit %llu\n",
+ (char *)uuid, (unsigned long)lqe->lqe_id.qid_uid,
+ qlvb->lvb_flags, qlvb->lvb_id_qunit);
lqe_putref(lqe);
} else {
/* global quota lock */
- struct lu_env *env;
- int rc;
struct dt_object *obj = res->lr_lvb_data;
- OBD_ALLOC_PTR(env);
- if (env == NULL)
- RETURN(-ENOMEM);
-
- /* initialize environment */
- rc = lu_env_init(env, LCT_LOCAL);
- if (rc) {
- OBD_FREE_PTR(env);
- RETURN(rc);
- }
-
/* return current version of global index */
qlvb->lvb_glb_ver = dt_version_get(env, obj);
-
- lu_env_fini(env);
- OBD_FREE_PTR(env);
}
- RETURN(sizeof(struct lquota_lvb));
+ RETURN(rc = rc ?: sizeof(struct lquota_lvb));
}
/*
RETURN(0);
if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] != 0) {
- struct lquota_entry *lqe = res->lr_lvb_data;
+ struct lquota_entry *lqe = res->lr_lvb_data;
+ struct lqe_glbl_data *lgd = lqe->lqe_glbl_data;
/* release lqe reference */
+ lqe->lqe_glbl_data = NULL;
lqe_putref(lqe);
+ qmt_free_lqe_gd(lgd);
} else {
- struct dt_object *obj = res->lr_lvb_data;
- struct lu_env *env;
- int rc;
-
- OBD_ALLOC_PTR(env);
- if (env == NULL)
- RETURN(-ENOMEM);
-
- /* initialize environment */
- rc = lu_env_init(env, LCT_LOCAL);
- if (rc) {
- OBD_FREE_PTR(env);
- RETURN(rc);
- }
-
+ struct dt_object *obj = res->lr_lvb_data;
/* release object reference */
- dt_object_put(env, obj);
- lu_env_fini(env);
- OBD_FREE_PTR(env);
+ dt_object_put(lu_env_find(), obj);
}
res->lr_lvb_data = NULL;
RETURN(0);
}
-typedef int (*qmt_glimpse_cb_t)(struct ldlm_lock *, void *);
+typedef int (*qmt_glimpse_cb_t)(struct ldlm_lock *, struct lquota_entry *);
struct qmt_gl_lock_array {
unsigned long q_max;
array->q_locks[i] = NULL;
}
array->q_cnt = 0;
- OBD_FREE(array->q_locks, array->q_max * sizeof(*array->q_locks));
+ OBD_FREE_PTR_ARRAY(array->q_locks, array->q_max);
array->q_locks = NULL;
array->q_max = 0;
}
array->q_max = count + count / 2 + 10;
count = 0;
LASSERT(array->q_locks == NULL && array->q_cnt == 0);
- OBD_ALLOC(array->q_locks,
- sizeof(*array->q_locks) * array->q_max);
+ OBD_ALLOC_PTR_ARRAY(array->q_locks, array->q_max);
if (array->q_locks == NULL) {
array->q_max = 0;
RETURN(-ENOMEM);
RETURN(0);
}
+void qmt_setup_id_desc(struct ldlm_lock *lock, union ldlm_gl_desc *desc,
+ struct lquota_entry *lqe)
+{
+ struct obd_uuid *uuid = &(lock)->l_export->exp_client_uuid;
+ struct lqe_glbl_data *lgd = lqe->lqe_glbl_data;
+ int idx, stype;
+ __u64 qunit;
+ bool edquot;
+
+ stype = qmt_uuid2idx(uuid, &idx);
+ LASSERT(stype >= 0);
+
+ /* DOM case - set global lqe settings */
+ if (lqe_rtype(lqe) == LQUOTA_RES_DT && stype == QMT_STYPE_MDT) {
+ edquot = lqe->lqe_edquot;
+ qunit = lqe->lqe_qunit;
+ } else {
+ edquot = lgd->lqeg_arr[idx].lge_edquot;
+ qunit = lgd->lqeg_arr[idx].lge_qunit;
+ }
+
+ /* fill glimpse descriptor with lqe settings */
+ desc->lquota_desc.gl_flags = edquot ? LQUOTA_FL_EDQUOT : 0;
+ desc->lquota_desc.gl_qunit = qunit;
+ CDEBUG(D_QUOTA, "setup desc: stype %d idx %d, edquot %llu qunit %llu\n",
+ stype, idx, desc->lquota_desc.gl_flags,
+ desc->lquota_desc.gl_qunit);
+}
+
/*
* Send glimpse callback to slaves holding a lock on resource \res.
* This is used to notify slaves of new quota settings or to claim quota space
*/
static int qmt_glimpse_lock(const struct lu_env *env, struct qmt_device *qmt,
struct ldlm_resource *res, union ldlm_gl_desc *desc,
- qmt_glimpse_cb_t cb, void *arg)
+ qmt_glimpse_cb_t cb, struct lquota_entry *lqe)
{
+ union ldlm_gl_desc *descs = NULL;
+ struct lqe_glbl_data *gld;
struct list_head *tmp, *pos;
- struct list_head gl_list = LIST_HEAD_INIT(gl_list);
+ LIST_HEAD(gl_list);
struct qmt_gl_lock_array locks;
- unsigned long i;
+ unsigned long i, locks_count;
int rc = 0;
ENTRY;
+ gld = lqe ? lqe->lqe_glbl_data : NULL;
memset(&locks, 0, sizeof(locks));
- rc = qmt_alloc_lock_array(res, &locks, cb, arg);
+ rc = qmt_alloc_lock_array(res, &locks, cb, lqe);
if (rc) {
CERROR("%s: failed to allocate glimpse lock array (%d)\n",
qmt->qmt_svname, rc);
RETURN(rc);
}
+ if (!locks.q_cnt) {
+ CDEBUG(D_QUOTA, "%s: no granted locks to send glimpse\n",
+ qmt->qmt_svname);
+ RETURN(0);
+ }
+ CDEBUG(D_QUOTA, "found granted locks %lu\n", locks.q_cnt);
+ locks_count = locks.q_cnt;
+
+ /* Use one desc for all works, when called from qmt_glb_lock_notify */
+ if (gld && locks.q_cnt > 1) {
+ /* TODO: think about to store this preallocated descs
+ * in lqe_global in lqeg_arr as a part of lqe_glbl_entry.
+ * The benefit is that we don't need to allocate/free
+ * and setup this descs each time. But the drawback is
+ * memory use (sizeof ldlm_gl_desc * OST_COUNT * user_number).
+ * for examfple it could be 88 * 256 * 10 000 about 225 MB. */
+ OBD_ALLOC(descs,
+ sizeof(struct ldlm_gl_lquota_desc) * locks.q_cnt);
+ if (!descs) {
+ CERROR("%s: alloc glimpse lock array failed: rc = %d\n",
+ qmt->qmt_svname, rc);
+ qmt_free_lock_array(&locks);
+ RETURN(-ENOMEM);
+ }
+ }
for (i = locks.q_cnt; i > 0; i--) {
struct ldlm_glimpse_work *work;
continue;
}
+ if (gld) {
+ if (descs)
+ desc = &descs[i - 1];
+ qmt_setup_id_desc(locks.q_locks[i - 1], desc, lqe);
+ work->gl_interpret_data = lqe;
+ }
+
list_add_tail(&work->gl_list, &gl_list);
work->gl_lock = locks.q_locks[i - 1];
work->gl_flags = 0;
if (list_empty(&gl_list)) {
CDEBUG(D_QUOTA, "%s: nobody to notify\n", qmt->qmt_svname);
- RETURN(0);
+ GOTO(out, rc = 0);
}
/* issue glimpse callbacks to all connected slaves */
LDLM_LOCK_RELEASE(work->gl_lock);
OBD_FREE_PTR(work);
}
+out:
+ if (descs)
+ OBD_FREE(descs,
+ sizeof(struct ldlm_gl_lquota_desc) * locks_count);
RETURN(rc);
}
struct ldlm_resource *res = NULL;
ENTRY;
- lquota_generate_fid(&qti->qti_fid, pool->qpi_key & 0x0000ffff,
- pool->qpi_key >> 16, lqe->lqe_site->lqs_qtype);
+ lquota_generate_fid(&qti->qti_fid, pool->qpi_rtype, lqe_qtype(lqe));
/* send glimpse callback to notify slaves of new quota settings */
qti->qti_gl_desc.lquota_desc.gl_id = lqe->lqe_id;
qti->qti_gl_desc.lquota_desc.gl_flags = 0;
- qti->qti_gl_desc.lquota_desc.gl_hardlimit = lqe->lqe_hardlimit;
- qti->qti_gl_desc.lquota_desc.gl_softlimit = lqe->lqe_softlimit;
- qti->qti_gl_desc.lquota_desc.gl_time = lqe->lqe_gracetime;
+ if (lqe->lqe_is_default) {
+ qti->qti_gl_desc.lquota_desc.gl_hardlimit = 0;
+ qti->qti_gl_desc.lquota_desc.gl_softlimit = 0;
+ qti->qti_gl_desc.lquota_desc.gl_time = LQUOTA_GRACE_FLAG(0,
+ LQUOTA_FLAG_DEFAULT);
+
+ } else {
+ qti->qti_gl_desc.lquota_desc.gl_hardlimit = lqe->lqe_hardlimit;
+ qti->qti_gl_desc.lquota_desc.gl_softlimit = lqe->lqe_softlimit;
+ qti->qti_gl_desc.lquota_desc.gl_time = lqe->lqe_gracetime;
+ }
qti->qti_gl_desc.lquota_desc.gl_ver = ver;
/* look up ldlm resource associated with global index */
/* Callback function used to select locks that should be glimpsed when
* broadcasting the new qunit value */
-static int qmt_id_lock_cb(struct ldlm_lock *lock, void *arg)
+static int qmt_id_lock_cb(struct ldlm_lock *lock, struct lquota_entry *lqe)
{
- struct obd_uuid *slv_uuid = arg;
- struct obd_uuid *uuid = &lock->l_export->exp_client_uuid;
+ struct obd_uuid *uuid = &(lock)->l_export->exp_client_uuid;
+ struct lqe_glbl_data *lgd = lqe->lqe_glbl_data;
+ int idx;
+ int stype = qmt_uuid2idx(uuid, &idx);
- if (slv_uuid != NULL && obd_uuid_equals(uuid, slv_uuid))
- RETURN(0);
- RETURN(+1);
+ LASSERT(stype == QMT_STYPE_OST || stype == QMT_STYPE_MDT);
+
+ /* Quota pools support only OSTs, despite MDTs also could be registered
+ * as LQUOTA_RES_DT devices(DOM). */
+ if (lqe_rtype(lqe) == LQUOTA_RES_DT && stype == QMT_STYPE_MDT)
+ return 1;
+ else
+ return lgd->lqeg_arr[idx].lge_edquot_nu ||
+ lgd->lqeg_arr[idx].lge_qunit_nu;
}
+
/*
* Send glimpse request on per-ID lock to push new qunit value to slave.
*
if (!lqe->lqe_enforced)
RETURN_EXIT;
- lquota_generate_fid(&qti->qti_fid, pool->qpi_key & 0x0000ffff,
- pool->qpi_key >> 16, lqe->lqe_site->lqs_qtype);
+ lquota_generate_fid(&qti->qti_fid, pool->qpi_rtype, lqe_qtype(lqe));
fid_build_quota_res_name(&qti->qti_fid, &lqe->lqe_id, &qti->qti_resid);
res = ldlm_resource_get(qmt->qmt_ns, NULL, &qti->qti_resid, LDLM_PLAIN,
0);
* - notify slaves that master ran out of quota space and there is no
* need to send acquire request any more until further notice */
- /* fill glimpse descriptor with lqe settings */
- if (lqe->lqe_edquot)
- qti->qti_gl_desc.lquota_desc.gl_flags = LQUOTA_FL_EDQUOT;
- else
- qti->qti_gl_desc.lquota_desc.gl_flags = 0;
- qti->qti_gl_desc.lquota_desc.gl_qunit = lqe->lqe_qunit;
-
+ /* TODO: it is not clear how to implement below case for all lqes
+ * from where slaves will be notified in qmt_glimpse_lock. Because
+ * here we have just global lqe with an array of OSTs that should
+ * be notified. Theoretically we can find all lqes that includes
+ * these OSTs, but it is not trivial. So I would propose to move
+ * this case to another place ... */
if (lqe->lqe_revoke_time == 0 &&
- qti->qti_gl_desc.lquota_desc.gl_qunit == pool->qpi_least_qunit)
+ lqe->lqe_qunit == pool->qpi_least_qunit)
/* reset lqe_may_rel, it will be updated on glimpse callback
* replies if needed */
lqe->lqe_may_rel = 0;
/* issue glimpse callback to slaves */
qmt_glimpse_lock(env, qmt, res, &qti->qti_gl_desc,
- uuid ? qmt_id_lock_cb : NULL, (void *)uuid);
+ qmt_id_lock_cb, lqe);
lqe_write_lock(lqe);
if (lqe->lqe_revoke_time == 0 &&
- qti->qti_gl_desc.lquota_desc.gl_qunit == pool->qpi_least_qunit &&
lqe->lqe_qunit == pool->qpi_least_qunit) {
lqe->lqe_revoke_time = ktime_get_seconds();
qmt_adjust_edquot(lqe, ktime_get_real_seconds());
bool added = false;
ENTRY;
+ LASSERT(lqe->lqe_is_global);
lqe_getref(lqe);
spin_lock(&qmt->qmt_reba_lock);
if (!qmt->qmt_stopping && list_empty(&lqe->lqe_link)) {
list_add_tail(&lqe->lqe_link, &qmt->qmt_reba_list);
added = true;
+ if (qmt->qmt_reba_task)
+ wake_up_process(qmt->qmt_reba_task);
}
spin_unlock(&qmt->qmt_reba_lock);
- if (added)
- wake_up(&qmt->qmt_reba_thread.t_ctl_waitq);
- else
+ if (!added)
lqe_putref(lqe);
EXIT;
}
+struct qmt_reba_args {
+ struct qmt_device *qra_dev;
+ struct lu_env qra_env;
+ struct completion *qra_started;
+};
+
+#ifndef TASK_IDLE
+#define TASK_IDLE TASK_INTERRUPTIBLE
+#endif
+
/*
* The rebalance thread is in charge of sending glimpse callbacks on per-ID
* quota locks owned by slaves in order to notify them of:
* try to acquire quota from the master since this latter has already
* distributed all the space.
*/
-static int qmt_reba_thread(void *arg)
+static int qmt_reba_thread(void *_args)
{
- struct qmt_device *qmt = (struct qmt_device *)arg;
- struct ptlrpc_thread *thread = &qmt->qmt_reba_thread;
- struct l_wait_info lwi = { 0 };
- struct lu_env *env;
+ struct qmt_reba_args *args = _args;
+ struct qmt_device *qmt = args->qra_dev;
+ struct lu_env *env = &args->qra_env;
struct lquota_entry *lqe, *tmp;
- int rc;
ENTRY;
- OBD_ALLOC_PTR(env);
- if (env == NULL)
- RETURN(-ENOMEM);
-
- rc = lu_env_init(env, LCT_MD_THREAD);
- if (rc) {
- CERROR("%s: failed to init env.", qmt->qmt_svname);
- OBD_FREE_PTR(env);
- RETURN(rc);
- }
-
- thread_set_flags(thread, SVC_RUNNING);
- wake_up(&thread->t_ctl_waitq);
-
- while (1) {
- l_wait_event(thread->t_ctl_waitq,
- !list_empty(&qmt->qmt_reba_list) ||
- !thread_is_running(thread), &lwi);
+ complete(args->qra_started);
+ while (({set_current_state(TASK_IDLE);
+ !kthread_should_stop(); })) {
spin_lock(&qmt->qmt_reba_lock);
list_for_each_entry_safe(lqe, tmp, &qmt->qmt_reba_list,
lqe_link) {
+ __set_current_state(TASK_RUNNING);
list_del_init(&lqe->lqe_link);
spin_unlock(&qmt->qmt_reba_lock);
- if (thread_is_running(thread))
+ if (!kthread_should_stop())
qmt_id_lock_glimpse(env, qmt, lqe, NULL);
lqe_putref(lqe);
spin_lock(&qmt->qmt_reba_lock);
}
spin_unlock(&qmt->qmt_reba_lock);
-
- if (!thread_is_running(thread))
- break;
+ schedule();
}
+ __set_current_state(TASK_RUNNING);
+
+ lu_env_remove(env);
lu_env_fini(env);
- OBD_FREE_PTR(env);
- thread_set_flags(thread, SVC_STOPPED);
- wake_up(&thread->t_ctl_waitq);
- RETURN(rc);
+ OBD_FREE_PTR(args);
+ RETURN(0);
}
/*
*/
int qmt_start_reba_thread(struct qmt_device *qmt)
{
- struct ptlrpc_thread *thread = &qmt->qmt_reba_thread;
- struct l_wait_info lwi = { 0 };
- struct task_struct *task;
+ struct task_struct *task;
+ struct qmt_reba_args *args;
+ DECLARE_COMPLETION_ONSTACK(started);
+ int rc;
ENTRY;
- task = kthread_run(qmt_reba_thread, (void *)qmt,
- "qmt_reba_%s", qmt->qmt_svname);
+ OBD_ALLOC_PTR(args);
+ if (args == NULL)
+ RETURN(-ENOMEM);
+ args->qra_dev = qmt;
+ args->qra_started = &started;
+
+ rc = lu_env_init(&args->qra_env, LCT_MD_THREAD);
+ if (rc) {
+ CERROR("%s: failed to init env.", qmt->qmt_svname);
+ GOTO(out_env, rc);
+ }
+
+ task = kthread_create(qmt_reba_thread, args,
+ "qmt_reba_%s", qmt->qmt_svname);
if (IS_ERR(task)) {
CERROR("%s: failed to start rebalance thread (%ld)\n",
qmt->qmt_svname, PTR_ERR(task));
- thread_set_flags(thread, SVC_STOPPED);
- RETURN(PTR_ERR(task));
+ GOTO(out_env_fini, rc = PTR_ERR(task));
}
- l_wait_event(thread->t_ctl_waitq,
- thread_is_running(thread) || thread_is_stopped(thread),
- &lwi);
+ rc = lu_env_add_task(&args->qra_env, task);
+ if (rc) {
+ kthread_stop(task);
+ GOTO(out_env_fini, rc);
+ }
+ qmt->qmt_reba_task = task;
+ wake_up_process(task);
+ wait_for_completion(&started);
RETURN(0);
+out_env_fini:
+ lu_env_fini(&args->qra_env);
+out_env:
+ OBD_FREE_PTR(args);
+ RETURN(rc);
}
/*
*/
void qmt_stop_reba_thread(struct qmt_device *qmt)
{
- struct ptlrpc_thread *thread = &qmt->qmt_reba_thread;
+ struct task_struct *task;
- if (!thread_is_stopped(thread)) {
- struct l_wait_info lwi = { 0 };
+ spin_lock(&qmt->qmt_reba_lock);
+ task = qmt->qmt_reba_task;
+ qmt->qmt_reba_task = NULL;
+ spin_unlock(&qmt->qmt_reba_lock);
- thread_set_flags(thread, SVC_STOPPING);
- wake_up(&thread->t_ctl_waitq);
+ if (task)
+ kthread_stop(task);
- l_wait_event(thread->t_ctl_waitq, thread_is_stopped(thread),
- &lwi);
- }
LASSERT(list_empty(&qmt->qmt_reba_list));
}