};
int lu_tgt_pool_init(struct lu_tgt_pool *op, unsigned int count);
-int lu_tgt_pool_add(struct lu_tgt_pool *op, __u32 idx, unsigned int min_count);
+#define lu_tgt_pool_add(op, idx, min_count) \
+ lu_tgt_pool_add_lock(op, idx, min_count, true)
+#define lu_tgt_pool_add_locked(op, idx, min_count) \
+ lu_tgt_pool_add_lock(op, idx, min_count, false)
+int lu_tgt_pool_add_lock(struct lu_tgt_pool *op, __u32 idx,
+ unsigned int min_count, bool locked);
int lu_tgt_pool_remove(struct lu_tgt_pool *op, __u32 idx);
void lu_tgt_pool_free(struct lu_tgt_pool *op);
int lu_tgt_check_index(int idx, struct lu_tgt_pool *osts);
* \retval 0 if target could be added to the pool
* \retval negative error if target \a idx was not added
*/
-int lu_tgt_pool_add(struct lu_tgt_pool *op, __u32 idx, unsigned int min_count)
+int lu_tgt_pool_add_lock(struct lu_tgt_pool *op, __u32 idx,
+ unsigned int min_count, bool lock)
{
unsigned int i;
int rc = 0;
ENTRY;
- down_write(&op->op_rw_sem);
+ if (lock)
+ down_write(&op->op_rw_sem);
/* search ost in pool array */
for (i = 0; i < op->op_count; i++) {
op->op_count++;
EXIT;
out:
- up_write(&op->op_rw_sem);
+ if (lock)
+ up_write(&op->op_rw_sem);
return rc;
}
-EXPORT_SYMBOL(lu_tgt_pool_add);
+EXPORT_SYMBOL(lu_tgt_pool_add_lock);
/**
* Remove an existing pool from the system.
struct lqe_glbl_entry {
__u64 lge_qunit;
- unsigned long lge_edquot:1,
+ unsigned long lge_idx:16,
+ /* index of target */
+ lge_edquot:1,
/* true when minimum qunit is set */
lge_qunit_set:1,
/* qunit or edquot is changed - need
int idx)
{
struct lquota_entry *lqe_gl, *lqe;
- bool need_reseed = false;
- bool need_notify = false;
+ bool reseed = false;
+ bool notify = false;
int i;
lqe_gl = qti_lqes_glbl(env);
for (i = 0; i < qti_lqes_cnt(env); i++) {
lqe = qti_lqes(env)[i];
if (qunit)
- need_reseed |= qmt_adjust_qunit(env, lqe);
+ reseed |= qmt_adjust_qunit(env, lqe);
if (edquot)
- need_reseed |= qmt_adjust_edquot(lqe, now);
+ reseed |= qmt_adjust_edquot(lqe, now);
}
LASSERT(lqe_gl);
if (!lqe_gl->lqe_glbl_data &&
(req_has_rep(qb_flags) || req_is_rel(qb_flags))) {
- if (need_reseed)
+ if (reseed)
CDEBUG(D_QUOTA,
"%s: can not notify - lge_glbl_data is not set\n",
qmt->qmt_svname);
- return need_reseed;
+ return reseed;
}
- if (need_reseed || idx >= 0) {
+ if (reseed || idx >= 0) {
mutex_lock(&lqe_gl->lqe_glbl_data_lock);
if (lqe_gl->lqe_glbl_data) {
struct lqe_glbl_data *lgd = lqe_gl->lqe_glbl_data;
- if (need_reseed) {
+ if (reseed) {
qmt_seed_glbe_all(env, lgd, qunit, edquot);
} else if (idx >= 0) {
- LASSERT(idx <= lgd->lqeg_num_used);
+ int lge_idx = qmt_map_lge_idx(lgd, idx);
+
/* If there are no locks yet when
* lge_qunit/edquot_nu is set, slaves
* are still not notified with new
* to the least qunit, but lqe_revoke_time is
* still not set.
*/
- need_notify = lgd->lqeg_arr[idx].lge_qunit_nu ||
- lgd->lqeg_arr[idx].lge_edquot_nu;
+ notify = lgd->lqeg_arr[lge_idx].lge_qunit_nu ||
+ lgd->lqeg_arr[lge_idx].lge_edquot_nu;
}
}
mutex_unlock(&lqe_gl->lqe_glbl_data_lock);
}
- if (need_reseed || need_notify)
+ if (reseed || notify)
qmt_id_lock_notify(qmt, lqe_gl);
- return need_reseed;
+ return reseed;
}
lqe_write_unlock(qti_lqes(env)[i]);
}
-#define QMT_INIT_SLV_CNT 64
+#define QMT_INIT_SLV_CNT 64
struct lqe_glbl_data *qmt_alloc_lqe_gd(struct qmt_pool_info *pool, int qtype)
{
struct lqe_glbl_data *lgd;
OBD_FREE(lgd, sizeof(struct lqe_glbl_data));
}
+int qmt_map_lge_idx(struct lqe_glbl_data *lgd, int ostidx)
+{
+ int k;
+
+ /* check common case of sequential OST numbers first */
+ if (ostidx < lgd->lqeg_num_used &&
+ lgd->lqeg_arr[ostidx].lge_idx == ostidx)
+ return ostidx;
+
+ for (k = 0; k < lgd->lqeg_num_used; k++)
+ if (lgd->lqeg_arr[k].lge_idx == ostidx)
+ break;
+
+ LASSERTF(k < lgd->lqeg_num_used, "Cannot map ostidx %d for %p\n",
+ ostidx, lgd);
+ return k;
+}
+
void qmt_seed_glbe_all(const struct lu_env *env, struct lqe_glbl_data *lgd,
bool qunit, bool edquot)
{
- struct rw_semaphore *sem = NULL;
- struct qmt_pool_info *qpi;
- int i, j, idx;
+ struct qmt_pool_info *qpi;
+ int i, j;
ENTRY;
if (!qti_lqes_cnt(env))
CDEBUG(D_QUOTA, "lqes_cnt %d, i %d\n", qti_lqes_cnt(env), i);
qpi = lqe2qpi(lqe);
- if (qmt_pool_global(qpi)) {
- slaves_cnt = qpi_slv_nr_by_rtype(lqe2qpi(lqe),
- lqe_qtype(lqe));
- } else {
- sem = qmt_sarr_rwsem(qpi);
- down_read(sem);
- slaves_cnt = qmt_sarr_count(qpi);
- }
+ qmt_sarr_read_down(qpi);
+ slaves_cnt = qmt_sarr_count(qpi);
for (j = 0; j < slaves_cnt; j++) {
- idx = qmt_sarr_get_idx(qpi, j);
- LASSERT(idx >= 0);
+ int idx, tgt_idx;
+
+ tgt_idx = qmt_sarr_get_idx(qpi, j);
+ LASSERT(tgt_idx >= 0);
+ idx = qmt_map_lge_idx(lgd, tgt_idx);
if (edquot) {
int lge_edquot, new_edquot, edquot_nu;
__u64 lge_qunit, new_qunit;
CDEBUG(D_QUOTA,
- "idx %d lge_qunit_set %d lge_qunit %llu new_qunit %llu\n",
- idx, lgd->lqeg_arr[idx].lge_qunit_set,
+ "tgt_idx %d idx %d lge_qunit_set %d lge_qunit %llu new_qunit %llu\n",
+ tgt_idx, idx,
+ lgd->lqeg_arr[idx].lge_qunit_set,
lgd->lqeg_arr[idx].lge_qunit,
lqe->lqe_qunit);
/* lge for this idx is already set
}
}
- if (!qmt_pool_global(qpi))
- up_read(sem);
+ qmt_sarr_read_up(qpi);
}
/* TODO: only for debug purposes - remove it later */
for (i = 0; i < lgd->lqeg_num_used; i++)
CDEBUG(D_QUOTA,
- "lgd ost %d, qunit %lu nu %d; edquot %d nu %d\n",
- i, (long unsigned)lgd->lqeg_arr[i].lge_qunit,
+ "lgd i %d tgt_idx %d qunit %lu nu %d; edquot %d nu %d\n",
+ i, lgd->lqeg_arr[i].lge_idx,
+ (unsigned long)lgd->lqeg_arr[i].lge_qunit,
lgd->lqeg_arr[i].lge_qunit_nu,
lgd->lqeg_arr[i].lge_edquot,
lgd->lqeg_arr[i].lge_edquot_nu);
struct lquota_entry *lqe, struct lqe_glbl_data *lgd,
int pool_type)
{
- __u64 qunit;
- bool edquot;
- int i;
+ __u64 qunit;
+ bool edquot;
+ int i;
qunit = lqe->lqe_qunit;
edquot = lqe->lqe_edquot;
/* Firstly set all elements in array with
* qunit and edquot of global pool */
+ qmt_sarr_read_down(lqe2qpi(lqe));
for (i = 0; i < lgd->lqeg_num_used; i++) {
lgd->lqeg_arr[i].lge_qunit = qunit;
lgd->lqeg_arr[i].lge_edquot = edquot;
* will be sent to slaves during qmt_lvbo_fill. */
lgd->lqeg_arr[i].lge_qunit_nu = 0;
lgd->lqeg_arr[i].lge_edquot_nu = 0;
+ lgd->lqeg_arr[i].lge_idx = qmt_sarr_get_idx(lqe2qpi(lqe), i);
}
+ qmt_sarr_read_up(lqe2qpi(lqe));
qmt_pool_lqes_lookup_spec(env, qmt, pool_type,
lqe_qtype(lqe), &lqe->lqe_id);
qmt_pool_lookup(env, qmt, type, NULL, -1, false)
#define qmt_pool_lookup_name(env, qmt, type, name) \
qmt_pool_lookup(env, qmt, type, name, -1, false)
-#define qmt_pool_lookup_arr(env, qmt, type, idx) \
- qmt_pool_lookup(env, qmt, type, NULL, idx, true)
+
+/*
+ * Until MDT pools are not emplemented, all MDTs belong to
+ * global pool, thus lookup lqes only in global pool for the
+ * DOM case.
+ */
+#define qmt_pool_lookup_arr(env, qmt, type, idx, stype) \
+ qmt_pool_lookup(env, qmt, type, NULL, \
+ qmt_dom(type, stype) ? -1 : idx, true)
struct qmt_pool_info *qmt_pool_lookup(const struct lu_env *env,
struct qmt_device *qmt,
int rtype,
int qmt_pool_rem(struct obd_device *obd, char *poolname, char *ostname);
int qmt_pool_del(struct obd_device *obd, char *poolname);
-struct rw_semaphore *qmt_sarr_rwsem(struct qmt_pool_info *qpi);
+#define qmt_sarr_read_down(qpi) down_read(&qpi->qpi_sarr.osts.op_rw_sem)
+#define qmt_sarr_read_up(qpi) up_read(&qpi->qpi_sarr.osts.op_rw_sem)
+#define qmt_sarr_write_down(qpi) down_write(&qpi->qpi_sarr.osts.op_rw_sem)
+#define qmt_sarr_write_up(qpi) up_write(&qpi->qpi_sarr.osts.op_rw_sem)
int qmt_sarr_get_idx(struct qmt_pool_info *qpi, int arr_idx);
unsigned int qmt_sarr_count(struct qmt_pool_info *qpi);
void qti_lqes_write_lock(const struct lu_env *env);
void qti_lqes_write_unlock(const struct lu_env *env);
+int qmt_map_lge_idx(struct lqe_glbl_data *lgd, int ostidx);
struct lqe_glbl_data *qmt_alloc_lqe_gd(struct qmt_pool_info *, int);
void qmt_free_lqe_gd(struct lqe_glbl_data *);
void qmt_setup_lqe_gd(const struct lu_env *, struct qmt_device *,
*/
static bool qmt_clear_lgeg_arr_nu(struct lquota_entry *lqe, int stype, int idx)
{
- unsigned long least_qunit = lqe2qpi(lqe)->qpi_least_qunit;
- struct lqe_glbl_data *lgd = lqe->lqe_glbl_data;
+ unsigned long least = lqe2qpi(lqe)->qpi_least_qunit;
+ bool revoke = false;
/* There is no array to store lge for the case of DOM.
- * Ignore it until MDT pools will be ready. */
+ * Ignore it until MDT pools will be ready.
+ */
if (!qmt_dom(lqe_rtype(lqe), stype)) {
- lqe->lqe_glbl_data->lqeg_arr[idx].lge_qunit_nu = 0;
- lqe->lqe_glbl_data->lqeg_arr[idx].lge_edquot_nu = 0;
+ struct lqe_glbl_data *lgd;
+
+ mutex_lock(&lqe->lqe_glbl_data_lock);
+ lgd = lqe->lqe_glbl_data;
+ if (lgd) {
+ int lge_idx = qmt_map_lge_idx(lgd, idx);
- /* We shouldn't call revoke for DOM case, it will be updated
- * at qmt_id_lock_glimpse. */
- return (lgd->lqeg_arr[idx].lge_qunit == least_qunit);
+ lgd->lqeg_arr[lge_idx].lge_qunit_nu = 0;
+ lgd->lqeg_arr[lge_idx].lge_edquot_nu = 0;
+ /* We shouldn't call revoke for DOM case, it will be
+ * updated at qmt_id_lock_glimpse.
+ */
+ revoke = lgd->lqeg_arr[lge_idx].lge_qunit == least;
+ }
+ mutex_unlock(&lqe->lqe_glbl_data_lock);
}
- return false;
+ return revoke;
}
-static bool qmt_set_revoke(struct lu_env *env, struct lquota_entry *lqe,
+static bool qmt_set_revoke(struct lu_env *env, struct lquota_entry *lqe_gl,
int stype, int idx)
{
- unsigned long least_qunit = lqe2qpi(lqe)->qpi_least_qunit;
- struct lqe_glbl_data *lgd = lqe->lqe_glbl_data;
+ unsigned long least_qunit = lqe2qpi(lqe_gl)->qpi_least_qunit;
bool notify = false;
- if (lgd->lqeg_arr[idx].lge_qunit == least_qunit) {
- int i;
-
- qti_lqes_write_lock(env);
- for (i = 0; i < qti_lqes_cnt(env); i++) {
- LQUOTA_DEBUG(qti_lqes(env)[i],
- "idx %d lge_qunit %llu least_qunit %lu\n",
- idx, lgd->lqeg_arr[idx].lge_qunit,
- least_qunit);
- if (qti_lqes(env)[i]->lqe_qunit == least_qunit) {
- qti_lqes(env)[i]->lqe_revoke_time =
+ if (qmt_dom(lqe_rtype(lqe_gl), stype))
+ return false;
+
+ qti_lqes_write_lock(env);
+ mutex_lock(&lqe_gl->lqe_glbl_data_lock);
+ if (lqe_gl->lqe_glbl_data) {
+ struct lqe_glbl_data *lgd = lqe_gl->lqe_glbl_data;
+ int lge_idx;
+
+ lge_idx = qmt_map_lge_idx(lgd, idx);
+ if (lgd->lqeg_arr[lge_idx].lge_qunit == least_qunit) {
+ struct lquota_entry *lqe;
+ int i;
+
+ for (i = 0; i < qti_lqes_cnt(env); i++) {
+ lqe = qti_lqes(env)[i];
+ LQUOTA_DEBUG(lqe,
+ "lge_qunit %llu least_qunit %lu idx %d\n",
+ lgd->lqeg_arr[lge_idx].lge_qunit,
+ least_qunit, idx);
+ if (lqe->lqe_qunit == least_qunit) {
+ lqe->lqe_revoke_time =
ktime_get_seconds();
- notify |= qmt_adjust_edquot(qti_lqes(env)[i],
+ notify |= qmt_adjust_edquot(lqe,
ktime_get_real_seconds());
+ }
}
}
- qti_lqes_write_unlock(env);
}
+ mutex_unlock(&lqe_gl->lqe_glbl_data_lock);
+ qti_lqes_write_unlock(env);
+
return notify;
}
GOTO(out_exp, rc);
if (need_revoke && qmt_set_revoke(env, lqe, stype, idx)) {
+ int notify = false;
+
mutex_lock(&lqe->lqe_glbl_data_lock);
if (lqe->lqe_glbl_data) {
qmt_seed_glbe_edquot(env, lqe->lqe_glbl_data);
- qmt_id_lock_notify(qmt, lqe);
+ notify = true;
}
mutex_unlock(&lqe->lqe_glbl_data_lock);
+ if (notify)
+ qmt_id_lock_notify(qmt, lqe);
}
if (lvb->lvb_id_rel) {
qunit = lqe->lqe_qunit;
} else {
struct lqe_glbl_data *lgd;
+ int lge_idx;
mutex_lock(&lqe->lqe_glbl_data_lock);
lgd = lqe->lqe_glbl_data;
if (lgd) {
- edquot = lgd->lqeg_arr[idx].lge_edquot;
- qunit = lgd->lqeg_arr[idx].lge_qunit;
+ lge_idx = qmt_map_lge_idx(lgd, idx);
+ edquot = lgd->lqeg_arr[lge_idx].lge_edquot;
+ qunit = lgd->lqeg_arr[lge_idx].lge_qunit;
} else {
edquot = lqe->lqe_edquot;
qunit = lqe->lqe_qunit;
* broadcasting the new qunit value */
static int qmt_id_lock_cb(struct ldlm_lock *lock, struct lquota_entry *lqe)
{
- struct obd_uuid *uuid = &(lock)->l_export->exp_client_uuid;
+ struct obd_uuid *uuid = &(lock)->l_export->exp_client_uuid;
struct lqe_glbl_data *lgd = lqe->lqe_glbl_data;
int idx;
int stype = qmt_uuid2idx(uuid, &idx);
LASSERT(stype == QMT_STYPE_OST || stype == QMT_STYPE_MDT);
+ CDEBUG(D_QUOTA, "stype %d rtype %d idx %d uuid %s\n",
+ stype, lqe_rtype(lqe), idx, uuid->uuid);
/* Quota pools support only OSTs, despite MDTs also could be registered
* as LQUOTA_RES_DT devices(DOM). */
if (qmt_dom(lqe_rtype(lqe), stype))
return 1;
- else
- return lgd ? lgd->lqeg_arr[idx].lge_edquot_nu ||
- lgd->lqeg_arr[idx].lge_qunit_nu : 0;
+
+ if (lgd) {
+ int lge_idx = qmt_map_lge_idx(lgd, idx);
+
+ CDEBUG(D_QUOTA,
+ "tgt idx:%d lge_idx:%d edquot_nu:%d qunit_nu:%d\n",
+ idx, lge_idx, lgd->lqeg_arr[lge_idx].lge_edquot_nu,
+ lgd->lqeg_arr[lge_idx].lge_qunit_nu);
+ return lgd->lqeg_arr[lge_idx].lge_edquot_nu ||
+ lgd->lqeg_arr[lge_idx].lge_qunit_nu;
+ }
+
+ return 0;
}
#include "qmt_internal.h"
static inline int qmt_sarr_pool_init(struct qmt_pool_info *qpi);
-static inline int qmt_sarr_pool_add(struct qmt_pool_info *qpi,
- int idx, int min);
+#define qmt_sarr_pool_add(qpi, idx, stype) \
+ _qmt_sarr_pool_add(qpi, idx, stype, false)
+#define qmt_sarr_pool_add_locked(qpi, idx, stype) \
+ _qmt_sarr_pool_add(qpi, idx, stype, true)
+static inline int _qmt_sarr_pool_add(struct qmt_pool_info *qpi,
+ int idx, int min, bool locked);
static inline int qmt_sarr_pool_rem(struct qmt_pool_info *qpi, int idx);
static inline void qmt_sarr_pool_free(struct qmt_pool_info *qpi);
static inline int qmt_sarr_check_idx(struct qmt_pool_info *qpi, int idx);
RETURN(rc);
}
-static int qmt_slv_cnt(const struct lu_env *env, struct lu_fid *glb_fid,
+static int qmt_slv_add(const struct lu_env *env, struct lu_fid *glb_fid,
char *slv_name, struct lu_fid *slv_fid, void *arg)
{
struct obd_uuid uuid;
- int (*nr)[QMT_STYPE_CNT][LL_MAXQUOTAS] = arg;
- int stype, qtype;
+ struct qmt_pool_info *qpi = arg;
+ int stype, qtype, idx;
int rc;
rc = lquota_extract_fid(glb_fid, NULL, &qtype);
LASSERT(!rc);
obd_str2uuid(&uuid, slv_name);
- stype = qmt_uuid2idx(&uuid, NULL);
+ stype = qmt_uuid2idx(&uuid, &idx);
if (stype < 0)
return stype;
+
+ CDEBUG(D_QUOTA, "add new idx:%d in %s\n", idx, qpi->qpi_name);
+ rc = qmt_sarr_pool_add(qpi, idx, stype);
+ if (rc && rc != -EEXIST) {
+ CERROR("%s: can't add idx %d into dt-0x0: rc = %d\n",
+ qpi->qpi_qmt->qmt_svname, idx, rc);
+ return rc;
+ }
+
/* one more slave */
- (*nr)[stype][qtype]++;
+ qpi->qpi_slv_nr[stype][qtype]++;
CDEBUG(D_QUOTA, "slv_name %s stype %d qtype %d nr %d\n",
- slv_name, stype, qtype, (*nr)[stype][qtype]);
+ slv_name, stype, qtype, qpi->qpi_slv_nr[stype][qtype]);
return 0;
}
rc = lquota_disk_for_each_slv(env, pool->qpi_root,
&qti->qti_fid,
- qmt_slv_cnt,
- &pool->qpi_slv_nr);
+ qmt_slv_add,
+ pool);
if (rc) {
CERROR("%s: failed to scan & count slave indexes for %s type: rc = %d\n",
qmt->qmt_svname, qtype_name(qtype), rc);
RETURN(0);
}
+static int qmt_lgd_extend_cb(struct cfs_hash *hs, struct cfs_hash_bd *bd,
+ struct hlist_node *hnode, void *data)
+{
+ struct lqe_glbl_entry *lqeg_arr, *old_lqeg_arr;
+ struct lquota_entry *lqe;
+ int old_num, rc;
+
+ lqe = hlist_entry(hnode, struct lquota_entry, lqe_hash);
+ LASSERT(atomic_read(&lqe->lqe_ref) > 0);
+ rc = 0;
+
+ CDEBUG(D_QUOTA, "lgd %px\n", lqe->lqe_glbl_data);
+ if (lqe->lqe_glbl_data) {
+ struct lqe_glbl_data *lgd;
+
+ old_lqeg_arr = NULL;
+ mutex_lock(&lqe->lqe_glbl_data_lock);
+ if (lqe->lqe_glbl_data) {
+ struct qmt_pool_info *qpi = lqe2qpi(lqe);
+ int sarr_cnt = qmt_sarr_count(qpi);
+
+ lgd = lqe->lqe_glbl_data;
+ if (lgd->lqeg_num_alloc < sarr_cnt) {
+ LASSERT((lgd->lqeg_num_alloc + 1) == sarr_cnt);
+
+ OBD_ALLOC(lqeg_arr,
+ sizeof(struct lqe_glbl_entry) *
+ (lgd->lqeg_num_alloc + 16));
+ if (lqeg_arr) {
+ memcpy(lqeg_arr, lgd->lqeg_arr,
+ sizeof(struct lqe_glbl_entry) *
+ (lgd->lqeg_num_alloc));
+ old_lqeg_arr = lgd->lqeg_arr;
+ old_num = lgd->lqeg_num_alloc;
+ lgd->lqeg_arr = lqeg_arr;
+ lgd->lqeg_num_alloc += 16;
+ CDEBUG(D_QUOTA,
+ "extend lqeg_arr:%px from %d to %d\n",
+ lgd, old_num,
+ lgd->lqeg_num_alloc);
+ } else {
+ CERROR("%s: cannot allocate new lqeg_arr: rc = %d\n",
+ qpi->qpi_qmt->qmt_svname,
+ -ENOMEM);
+ GOTO(out, rc = -ENOMEM);
+ }
+ }
+ lgd->lqeg_arr[lgd->lqeg_num_used].lge_idx =
+ qmt_sarr_get_idx(qpi, sarr_cnt - 1);
+ lgd->lqeg_arr[lgd->lqeg_num_used].lge_edquot =
+ lqe->lqe_edquot;
+ lgd->lqeg_arr[lgd->lqeg_num_used].lge_qunit =
+ lqe->lqe_qunit;
+ lgd->lqeg_arr[lgd->lqeg_num_used].lge_edquot_nu = 0;
+ lgd->lqeg_arr[lgd->lqeg_num_used].lge_qunit_nu = 0;
+ LQUOTA_DEBUG(lqe, "add tgt idx:%d used %d alloc %d\n",
+ lgd->lqeg_arr[lgd->lqeg_num_used].lge_idx,
+ lgd->lqeg_num_used, lgd->lqeg_num_alloc);
+ lgd->lqeg_num_used++;
+ }
+out:
+ mutex_unlock(&lqe->lqe_glbl_data_lock);
+ OBD_FREE(old_lqeg_arr, old_num * sizeof(struct lqe_glbl_entry));
+ }
+
+ return rc;
+}
+
/*
* Handle new slave connection. Called when a slave enqueues the global quota
* lock at the beginning of the reintegration procedure.
stype = qmt_uuid2idx(uuid, &idx);
if (stype < 0)
RETURN(stype);
+ CDEBUG(D_QUOTA, "FID "DFID"\n", PFID(glb_fid));
/* extract pool info from global index FID */
rc = lquota_extract_fid(glb_fid, &pool_type, &qtype);
if (rc)
RETURN(rc);
- /* look-up pool in charge of this global index FID */
- qti_pools_init(env);
- pool = qmt_pool_lookup_arr(env, qmt, pool_type, idx);
+ pool = qmt_pool_lookup_glb(env, qmt, pool_type);
if (IS_ERR(pool))
RETURN(PTR_ERR(pool));
memcpy(slv_fid, lu_object_fid(&slv_obj->do_lu), sizeof(*slv_fid));
*slv_ver = dt_version_get(env, slv_obj);
dt_object_put(env, slv_obj);
- if (created)
+ if (created) {
+ struct qmt_pool_info *ptr;
+
+ CDEBUG(D_QUOTA, "add tgt idx:%d pool_type:%d qtype:%d stype:%d\n",
+ idx, pool_type, qtype, stype);
+
+ if (!qmt_dom(qtype, stype)) {
+ qmt_sarr_write_down(pool);
+ rc = qmt_sarr_pool_add_locked(pool, idx, stype);
+ if (!rc) {
+ for (i = 0; i < LL_MAXQUOTAS; i++)
+ cfs_hash_for_each(pool->qpi_site[i]->
+ lqs_hash,
+ qmt_lgd_extend_cb,
+ &env);
+ } else if (rc == -EEXIST) {
+ /* This target has been already added
+ * by another qtype
+ */
+ rc = 0;
+ }
+ qmt_sarr_write_up(pool);
+
+ if (rc) {
+ CERROR("%s: cannot add idx:%d to pool %s: rc = %d\n",
+ qmt->qmt_svname, idx,
+ pool->qpi_name, rc);
+ GOTO(out, rc);
+ }
+ }
+
+ /* look-up pool in charge of this global index FID */
+ qti_pools_init(env);
+ ptr = qmt_pool_lookup_arr(env, qmt, pool_type, idx, stype);
+ if (IS_ERR(ptr))
+ GOTO(out, rc = PTR_ERR(ptr));
+
for (i = 0; i < qti_pools_cnt(env); i++)
qti_pools_env(env)[i]->qpi_slv_nr[stype][qtype]++;
+
+ qti_pools_fini(env);
+ }
+
out:
- qti_pools_fini(env);
+ qpi_putref(env, pool);
RETURN(rc);
}
int rc, i;
ENTRY;
- /* Until MDT pools are not emplemented, all MDTs belong to
- * global pool, thus lookup lqes only from global pool. */
- if (qmt_dom(rtype, stype))
- idx = -1;
-
qti_pools_init(env);
rc = 0;
/* look-up pool responsible for this global index FID */
- pool = qmt_pool_lookup_arr(env, qmt, rtype, idx);
+ pool = qmt_pool_lookup_arr(env, qmt, rtype, idx, stype);
if (IS_ERR(pool)) {
qti_pools_fini(env);
RETURN(PTR_ERR(pool));
static int qmt_pool_recalc(void *args)
{
struct qmt_pool_info *pool, *glbl_pool;
- struct rw_semaphore *sem = NULL;
struct obd_device *obd;
struct lu_env env;
int i, rc, qtype, slaves_cnt;
+ bool sem = false;
ENTRY;
pool = args;
obd = qmt_get_mgc(pool->qpi_qmt);
if (IS_ERR(obd))
GOTO(out, rc = PTR_ERR(obd));
- else
- /* Waiting for the end of processing mgs config.
- * It is needed to be sure all pools are configured. */
- while (obd->obd_process_conf)
- schedule_timeout_uninterruptible(cfs_time_seconds(1));
+
+ /* Waiting for the end of processing mgs config.
+ * It is needed to be sure all pools are configured.
+ */
+ while (obd->obd_process_conf)
+ schedule_timeout_uninterruptible(cfs_time_seconds(1));
CFS_FAIL_TIMEOUT(OBD_FAIL_QUOTA_RECALC, cfs_fail_val);
- sem = qmt_sarr_rwsem(pool);
- LASSERT(sem);
- down_read(sem);
+ qmt_sarr_read_down(pool);
+ sem = true;
/* Hold this to be sure that OSTs from this pool
* can't do acquire/release.
*
* Thus until up_read, no one can restart recalc thread.
*/
if (sem) {
- up_read(sem);
+ qmt_sarr_read_up(pool);
up_write(&pool->qpi_recalc_sem);
}
GOTO(out, rc = PTR_ERR(qpi));
}
- rc = add ? qmt_sarr_pool_add(qpi, idx, 32) :
+ rc = add ? qmt_sarr_pool_add(qpi, idx, QMT_STYPE_OST) :
qmt_sarr_pool_rem(qpi, idx);
if (rc) {
/* message is checked in sanity-quota test_1b */
static inline int qmt_sarr_pool_init(struct qmt_pool_info *qpi)
{
-
- /* No need to initialize sarray for global pool
- * as it always includes all slaves */
- if (qmt_pool_global(qpi))
- return 0;
-
- switch (qpi->qpi_rtype) {
- case LQUOTA_RES_DT:
- return lu_tgt_pool_init(&qpi->qpi_sarr.osts, 0);
- case LQUOTA_RES_MD:
- default:
- return 0;
- }
+ return lu_tgt_pool_init(&qpi->qpi_sarr.osts, 0);
}
-static inline int qmt_sarr_pool_add(struct qmt_pool_info *qpi, int idx, int min)
+static inline int
+_qmt_sarr_pool_add(struct qmt_pool_info *qpi, int idx, int stype, bool locked)
{
- switch (qpi->qpi_rtype) {
- case LQUOTA_RES_DT:
- return lu_tgt_pool_add(&qpi->qpi_sarr.osts, idx, min);
- case LQUOTA_RES_MD:
- default:
+ /* We don't have an array for DOM */
+ if (qmt_dom(qpi->qpi_rtype, stype))
return 0;
- }
+
+ if (locked)
+ return lu_tgt_pool_add_locked(&qpi->qpi_sarr.osts, idx, 32);
+ else
+ return lu_tgt_pool_add(&qpi->qpi_sarr.osts, idx, 32);
}
static inline int qmt_sarr_pool_rem(struct qmt_pool_info *qpi, int idx)
{
- switch (qpi->qpi_rtype) {
- case LQUOTA_RES_DT:
- return lu_tgt_pool_remove(&qpi->qpi_sarr.osts, idx);
- case LQUOTA_RES_MD:
- default:
- return 0;
- }
+ return lu_tgt_pool_remove(&qpi->qpi_sarr.osts, idx);
}
static inline void qmt_sarr_pool_free(struct qmt_pool_info *qpi)
{
- if (qmt_pool_global(qpi))
- return;
-
- switch (qpi->qpi_rtype) {
- case LQUOTA_RES_DT:
- if (qpi->qpi_sarr.osts.op_array)
- lu_tgt_pool_free(&qpi->qpi_sarr.osts);
- return;
- case LQUOTA_RES_MD:
- default:
- return;
- }
+ if (qpi->qpi_sarr.osts.op_array)
+ lu_tgt_pool_free(&qpi->qpi_sarr.osts);
}
static inline int qmt_sarr_check_idx(struct qmt_pool_info *qpi, int idx)
if (qmt_pool_global(qpi))
return 0;
- switch (qpi->qpi_rtype) {
- case LQUOTA_RES_DT:
- return lu_tgt_check_index(idx, &qpi->qpi_sarr.osts);
- case LQUOTA_RES_MD:
- default:
- return 0;
- }
-}
-
-struct rw_semaphore *qmt_sarr_rwsem(struct qmt_pool_info *qpi)
-{
- switch (qpi->qpi_rtype) {
- case LQUOTA_RES_DT:
- /* to protect ost_pool use */
- return &qpi->qpi_sarr.osts.op_rw_sem;
- case LQUOTA_RES_MD:
- default:
- return NULL;
- }
+ return lu_tgt_check_index(idx, &qpi->qpi_sarr.osts);
}
int qmt_sarr_get_idx(struct qmt_pool_info *qpi, int arr_idx)
{
-
- if (qmt_pool_global(qpi))
- return arr_idx;
-
- switch (qpi->qpi_rtype) {
- case LQUOTA_RES_DT:
- LASSERTF(arr_idx < qpi->qpi_sarr.osts.op_count && arr_idx >= 0,
- "idx invalid %d op_count %d\n", arr_idx,
- qpi->qpi_sarr.osts.op_count);
- return qpi->qpi_sarr.osts.op_array[arr_idx];
- case LQUOTA_RES_MD:
- default:
- return -EINVAL;
- }
+ LASSERTF(arr_idx < qpi->qpi_sarr.osts.op_count && arr_idx >= 0,
+ "idx invalid %d op_count %d\n", arr_idx,
+ qpi->qpi_sarr.osts.op_count);
+ return qpi->qpi_sarr.osts.op_array[arr_idx];
}
/* Number of slaves in a pool */
unsigned int qmt_sarr_count(struct qmt_pool_info *qpi)
{
- switch (qpi->qpi_rtype) {
- case LQUOTA_RES_DT:
- return qpi->qpi_sarr.osts.op_count;
- case LQUOTA_RES_MD:
- default:
- return -EINVAL;
- }
+ return qpi->qpi_sarr.osts.op_count;
}
}
run_test 33b "Drop cancel during umount"
+test_33c() {
+ (( MDS1_VERSION >= $(version_code 2.15.57) )) ||
+ skip "Need MDS version at least 2.15.57"
+ local MDSDEV=$(mdsdevname ${SINGLEMDS//mds/})
+ local tstid=${TSTID:-"$(id -u $TSTUSR)"}
+ local mkfsoptions
+ local qpool="qpool1"
+
+ [ -n "$ost1_HOST" ] && fs2ost_HOST=$ost1_HOST
+
+ local fs2mdsdev=$(mdsdevname 1_2)
+ local fs2ostdev=$(ostdevname 1_2)
+ local fs2mdsvdev=$(mdsvdevname 1_2)
+ local fs2ostvdev=$(ostvdevname 1_2)
+
+ if [ "$mds1_FSTYPE" == ldiskfs ]; then
+ mkfsoptions="--mkfsoptions=\\\"-J size=8\\\"" # See bug 17931.
+ fi
+
+ if combined_mgs_mds; then
+ local mgs_flag="--mgs"
+ fi
+
+ load_modules
+ stack_trap unload_modules_conf
+
+ add fs2mds $(mkfs_opts mds1 ${fs2mdsdev}) --fsname=${FSNAME} \
+ --reformat $mgs_flag $mkfsoptions $fs2mdsdev $fs2mdsvdev ||
+ exit 10
+ add fs2ost $(mkfs_opts ost1 ${fs2ostdev}) --mgsnode=$MGSNID \
+ --fsname=${FSNAME} --index=0x7c6 --reformat $fs2ostdev \
+ $fs2ostvdev || exit 10
+
+
+ start fs2mds $fs2mdsdev $MDS_MOUNT_OPTS
+ stack_trap "stop fs2mds -f"
+ start fs2ost $fs2ostdev $OST_MOUNT_OPTS
+ stack_trap "stop fs2ost -f"
+
+ mount_client $MOUNT || error "client start failed"
+ stack_trap "umount_client $MOUNT"
+ mkdir_on_mdt0 $DIR/$tdir || error "cannot create $DIR/$tdir"
+ chmod 0777 $DIR/$tdir || error "chown failed"
+ if [[ $PERM_CMD == *"set_param -P"* ]]; then
+ do_facet mgs $PERM_CMD \
+ osd-*.$FSNAME-OST*.quota_slave.enable=$QUOTA_TYPE
+ else
+ do_facet mgs $PERM_CMD $FSNAME.quota.ost=$QUOTA_TYPE ||
+ error "set ost quota type failed"
+ fi
+
+ local old_MDSCOUNT=$MDSCOUNT
+ MDSCOUNT=1
+ stack_trap "MDSCOUNT=$old_MDSCOUNT"
+
+ pool_add $qpool || error "pool_add failed"
+ pool_add_targets $qpool 0x7c6
+
+ $LFS setquota -u $tstid -B20M -b 0 $MOUNT
+ $LFS setquota -g $tstid -B20M -b 0 $MOUNT
+ $LFS setquota -u $tstid -B20M -b 0 --pool $qpool $MOUNT
+ $LFS setquota -g $tstid -B20M -b 0 --pool $qpool $MOUNT
+
+ for i in {1..10}; do
+ runas -u $tstid -g $tstid dd if=/dev/zero of=$DIR/$tdir/f1 \
+ bs=1M count=30 oflag=direct
+ sleep 3
+ rm -f $DIR/$tdir/f1
+ done
+
+ destroy_pools
+ #umount_client $MOUNT || error "client start failed"
+ #stop fs2ost -f
+ #stop fs2mds -f
+ #cleanup_nocli || error "cleanup_nocli failed with $?"
+}
+run_test 33c "Mount ost with a large index number"
+
test_34a() {
setup
do_facet client "bash runmultiop_bg_pause $DIR/file O_c"
if [ -z $last ]; then
local list=$first
+ last=$first
else
local list=$(seq $first $step $last)
fi