* GPL HEADER END
*/
/*
- * Copyright (c) 2012, 2013, Intel Corporation.
+ * Copyright (c) 2012, 2017, Intel Corporation.
* Use is subject to license terms.
*
* Author: Johann Lombardi <johann.lombardi@intel.com>
*/
/*
- * A Quota Master Target has a hash table where it stores qmt_pool_info
+ * A Quota Master Target has a list(qmt_pool_list) where it stores qmt_pool_info
* structures. There is one such structure for each pool managed by the QMT.
*
* Each pool can have different quota types enforced (typically user & group
#include <lprocfs_status.h>
#include "qmt_internal.h"
-static void qmt_pool_free(const struct lu_env *, struct qmt_pool_info *);
+static inline int qmt_sarr_pool_init(struct qmt_pool_info *qpi);
+static inline int qmt_sarr_pool_add(struct qmt_pool_info *qpi,
+ int idx, int min);
+static inline int qmt_sarr_pool_rem(struct qmt_pool_info *qpi, int idx);
+static inline int qmt_sarr_pool_free(struct qmt_pool_info *qpi);
+static inline int qmt_sarr_check_idx(struct qmt_pool_info *qpi, int idx);
+static inline void qmt_stop_pool_recalc(struct qmt_pool_info *qpi);
/*
* Static helper functions not used outside the scope of this file
*/
-/*
- * Reference counter management for qmt_pool_info structures
- */
-static inline void qpi_getref(struct qmt_pool_info *pool)
-{
- atomic_inc(&pool->qpi_ref);
-}
-
-static inline void qpi_putref(const struct lu_env *env,
- struct qmt_pool_info *pool)
-{
- LASSERT(atomic_read(&pool->qpi_ref) > 0);
- if (atomic_dec_and_test(&pool->qpi_ref))
- qmt_pool_free(env, pool);
-}
-
static inline void qpi_putref_locked(struct qmt_pool_info *pool)
{
LASSERT(atomic_read(&pool->qpi_ref) > 1);
atomic_dec(&pool->qpi_ref);
}
-/*
- * Hash functions for qmt_pool_info management
- */
-
-static unsigned
-qpi_hash_hash(struct cfs_hash *hs, const void *key, unsigned mask)
-{
- return cfs_hash_u32_hash(*((__u32 *)key), mask);
-}
-
-static void *qpi_hash_key(struct hlist_node *hnode)
-{
- struct qmt_pool_info *pool;
- pool = hlist_entry(hnode, struct qmt_pool_info, qpi_hash);
- return &pool->qpi_key;
-}
-
-static int qpi_hash_keycmp(const void *key, struct hlist_node *hnode)
-{
- struct qmt_pool_info *pool;
- pool = hlist_entry(hnode, struct qmt_pool_info, qpi_hash);
- return pool->qpi_key == *((__u32 *)key);
-}
-
-static void *qpi_hash_object(struct hlist_node *hnode)
-{
- return hlist_entry(hnode, struct qmt_pool_info, qpi_hash);
-}
-
-static void qpi_hash_get(struct cfs_hash *hs, struct hlist_node *hnode)
-{
- struct qmt_pool_info *pool;
- pool = hlist_entry(hnode, struct qmt_pool_info, qpi_hash);
- qpi_getref(pool);
-}
-
-static void qpi_hash_put_locked(struct cfs_hash *hs, struct hlist_node *hnode)
-{
- struct qmt_pool_info *pool;
- pool = hlist_entry(hnode, struct qmt_pool_info, qpi_hash);
- qpi_putref_locked(pool);
-}
-
-static void qpi_hash_exit(struct cfs_hash *hs, struct hlist_node *hnode)
-{
- CERROR("Should not have any item left!\n");
-}
-
-/* vector of hash operations */
-static struct cfs_hash_ops qpi_hash_ops = {
- .hs_hash = qpi_hash_hash,
- .hs_key = qpi_hash_key,
- .hs_keycmp = qpi_hash_keycmp,
- .hs_object = qpi_hash_object,
- .hs_get = qpi_hash_get,
- .hs_put_locked = qpi_hash_put_locked,
- .hs_exit = qpi_hash_exit
-};
-
/* some procfs helpers */
static int qpi_state_seq_show(struct seq_file *m, void *data)
{
" type: %s\n"
" ref: %d\n"
" least qunit: %lu\n",
- pool->qpi_key & 0x0000ffff,
- RES_NAME(pool->qpi_key >> 16),
+ 0,
+ RES_NAME(pool->qpi_rtype),
atomic_read(&pool->qpi_ref),
pool->qpi_least_qunit);
- for (type = 0; type < MAXQUOTAS; type++)
+ for (type = 0; type < LL_MAXQUOTAS; type++)
seq_printf(m, " %s:\n"
" #slv: %d\n"
" #lqe: %d\n",
- QTYPE_NAME(type),
- pool->qpi_slv_nr[type],
+ qtype_name(type),
+ qpi_slv_nr(pool, type),
atomic_read(&pool->qpi_site[type]->lqs_hash->hs_count));
return 0;
}
LPROC_SEQ_FOPS_RO(qpi_state);
+static int qpi_soft_least_qunit_seq_show(struct seq_file *m, void *data)
+{
+ struct qmt_pool_info *pool = m->private;
+ LASSERT(pool != NULL);
+
+ seq_printf(m, "%lu\n", pool->qpi_soft_least_qunit);
+ return 0;
+}
+
+static ssize_t
+qpi_soft_least_qunit_seq_write(struct file *file, const char __user *buffer,
+ size_t count, loff_t *off)
+{
+ struct seq_file *m = file->private_data;
+ struct qmt_pool_info *pool = m->private;
+ long long least_qunit;
+ int qunit, rc;
+
+ LASSERT(pool != NULL);
+
+ /* Not tuneable for inode limit */
+ if (pool->qpi_rtype != LQUOTA_RES_DT)
+ return -EINVAL;
+
+ rc = kstrtoll_from_user(buffer, count, 0, &least_qunit);
+ if (rc)
+ return rc;
+
+ /* Miminal qpi_soft_least_qunit */
+ qunit = pool->qpi_least_qunit << 2;
+ /* The value must be power of miminal qpi_soft_least_qunit, see
+ * how the qunit is adjusted in qmt_adjust_qunit(). */
+ while (qunit > 0 && qunit < least_qunit)
+ qunit <<= 2;
+ if (qunit <= 0)
+ qunit = INT_MAX & ~3;
+
+ pool->qpi_soft_least_qunit = qunit;
+ return count;
+}
+LPROC_SEQ_FOPS(qpi_soft_least_qunit);
+
static struct lprocfs_vars lprocfs_quota_qpi_vars[] = {
{ .name = "info",
.fops = &qpi_state_fops },
+ { .name = "soft_least_qunit",
+ .fops = &qpi_soft_least_qunit_fops },
{ NULL }
};
/*
- * Allocate a new qmt_pool_info structure and add it to the pool hash table
- * of the qmt.
+ * Allocate a new qmt_pool_info structure and add it to qmt_pool_list.
*
* \param env - is the environment passed by the caller
* \param qmt - is the quota master target
- * \param pool_id - is the 16-bit pool identifier of the new pool to add
* \param pool_type - is the resource type of this pool instance, either
* LQUOTA_RES_MD or LQUOTA_RES_DT.
*
* \retval - 0 on success, appropriate error on failure
*/
static int qmt_pool_alloc(const struct lu_env *env, struct qmt_device *qmt,
- int pool_id, int pool_type)
+ char *pool_name, int pool_type)
{
struct qmt_thread_info *qti = qmt_info(env);
struct qmt_pool_info *pool;
if (pool == NULL)
RETURN(-ENOMEM);
INIT_LIST_HEAD(&pool->qpi_linkage);
+ init_rwsem(&pool->qpi_recalc_sem);
- /* assign key used by hash functions */
- pool->qpi_key = pool_id + (pool_type << 16);
+ pool->qpi_rtype = pool_type;
/* initialize refcount to 1, hash table will then grab an additional
* reference */
/* set up least qunit size to use for this pool */
pool->qpi_least_qunit = LQUOTA_LEAST_QUNIT(pool_type);
+ if (pool_type == LQUOTA_RES_DT)
+ pool->qpi_soft_least_qunit = pool->qpi_least_qunit << 2;
+ else
+ pool->qpi_soft_least_qunit = pool->qpi_least_qunit;
+
+ /* grab reference on master target that this pool belongs to */
+ lu_device_get(qmt2lu_dev(qmt));
+ lu_ref_add(&qmt2lu_dev(qmt)->ld_reference, "pool", pool);
+ pool->qpi_qmt = qmt;
/* create pool proc directory */
- sprintf(qti->qti_buf, "%s-0x%x", RES_NAME(pool_type), pool_id);
+ snprintf(qti->qti_buf, LQUOTA_NAME_MAX, "%s-%s",
+ RES_NAME(pool_type), pool_name);
+ strncpy(pool->qpi_name, pool_name, QPI_MAXNAME);
pool->qpi_proc = lprocfs_register(qti->qti_buf, qmt->qmt_proc,
lprocfs_quota_qpi_vars, pool);
if (IS_ERR(pool->qpi_proc)) {
GOTO(out, rc);
}
- /* grab reference on master target that this pool belongs to */
- lu_device_get(qmt2lu_dev(qmt));
- lu_ref_add(&qmt2lu_dev(qmt)->ld_reference, "pool", pool);
- pool->qpi_qmt = qmt;
-
- /* add to qmt hash table */
- rc = cfs_hash_add_unique(qmt->qmt_pool_hash, &pool->qpi_key,
- &pool->qpi_hash);
- if (rc) {
- CERROR("%s: failed to add pool %s to qmt hash (%d)\n",
- qmt->qmt_svname, qti->qti_buf, rc);
+ rc = qmt_sarr_pool_init(pool);
+ if (rc)
GOTO(out, rc);
- }
/* add to qmt pool list */
+ down_write(&qmt->qmt_pool_lock);
list_add_tail(&pool->qpi_linkage, &qmt->qmt_pool_list);
+ up_write(&qmt->qmt_pool_lock);
EXIT;
out:
if (rc)
* \param env - is the environment passed by the caller
* \param pool - is the qmt_pool_info structure to free
*/
-static void qmt_pool_free(const struct lu_env *env, struct qmt_pool_info *pool)
+void qmt_pool_free(const struct lu_env *env, struct qmt_pool_info *pool)
{
+ struct qmt_device *qmt = pool->qpi_qmt;
int qtype;
ENTRY;
+ /* remove from list */
+ down_write(&qmt->qmt_pool_lock);
+ list_del_init(&pool->qpi_linkage);
+ up_write(&qmt->qmt_pool_lock);
+
+ if (atomic_read(&pool->qpi_ref) > 0)
+ RETURN_EXIT;
+
+ qmt_stop_pool_recalc(pool);
+ qmt_sarr_pool_free(pool);
+
/* release proc entry */
if (pool->qpi_proc) {
lprocfs_remove(&pool->qpi_proc);
/* release per-quota type site used to manage quota entries as well as
* references to global index files */
- for (qtype = 0; qtype < MAXQUOTAS; qtype++) {
+ for (qtype = 0; qtype < LL_MAXQUOTAS; qtype++) {
/* release lqe storing grace time */
if (pool->qpi_grace_lqe[qtype] != NULL)
lqe_putref(pool->qpi_grace_lqe[qtype]);
/* release reference to global index */
if (pool->qpi_glb_obj[qtype] != NULL &&
!IS_ERR(pool->qpi_glb_obj[qtype]))
- lu_object_put(env, &pool->qpi_glb_obj[qtype]->do_lu);
+ dt_object_put(env, pool->qpi_glb_obj[qtype]);
}
/* release reference on pool directory */
if (pool->qpi_root != NULL && !IS_ERR(pool->qpi_root))
- lu_object_put(env, &pool->qpi_root->do_lu);
+ dt_object_put(env, pool->qpi_root);
/* release reference on the master target */
if (pool->qpi_qmt != NULL) {
struct lu_device *ld = qmt2lu_dev(pool->qpi_qmt);
- lu_device_put(ld);
lu_ref_del(&ld->ld_reference, "pool", pool);
+ lu_device_put(ld);
pool->qpi_qmt = NULL;
}
OBD_FREE_PTR(pool);
}
+static inline void qti_pools_init(const struct lu_env *env)
+{
+ struct qmt_thread_info *qti = qmt_info(env);
+
+ qti->qti_pools_cnt = 0;
+ qti->qti_pools_num = QMT_MAX_POOL_NUM;
+}
+
+#define qti_pools(qti) (qti->qti_pools_num > QMT_MAX_POOL_NUM ? \
+ qti->qti_pools : qti->qti_pools_small)
+#define qti_pools_env(env) \
+ (qmt_info(env)->qti_pools_num > QMT_MAX_POOL_NUM ? \
+ qmt_info(env)->qti_pools : qmt_info(env)->qti_pools_small)
+#define qti_pools_cnt(env) (qmt_info(env)->qti_pools_cnt)
+
+static inline int qti_pools_add(const struct lu_env *env,
+ struct qmt_pool_info *qpi)
+{
+ struct qmt_thread_info *qti = qmt_info(env);
+ struct qmt_pool_info **pools = qti->qti_pools;
+
+ pools = qti_pools(qti);
+ LASSERTF(qti->qti_pools_num >= QMT_MAX_POOL_NUM,
+ "Forgot init? %p\n", qti);
+
+ if (qti->qti_pools_cnt > qti->qti_pools_num) {
+ OBD_ALLOC(pools, sizeof(qpi) * qti->qti_pools_num * 2);
+ if (!pools)
+ return -ENOMEM;
+ memcpy(pools, qti_pools(qti), qti->qti_pools_cnt * sizeof(qpi));
+ /* Don't need to free, if it is the very 1st allocation */
+ if (qti->qti_pools_num > QMT_MAX_POOL_NUM)
+ OBD_FREE(qti->qti_pools,
+ qti->qti_pools_num * sizeof(qpi));
+ qti->qti_pools = pools;
+ qti->qti_pools_num *= 2;
+ }
+
+ qpi_getref(qpi);
+ /* Take this to protect pool's lqes against changing by
+ * recalculation thread. This would be unlocked at
+ * qti_pools_fini. */
+ down_read(&qpi->qpi_recalc_sem);
+ if (qmt_pool_global(qpi) && qti_pools_cnt(env) > 0) {
+ pools[qti->qti_pools_cnt++] = pools[0];
+ /* Store global pool always at index 0 */
+ pools[0] = qpi;
+ } else {
+ pools[qti->qti_pools_cnt++] = qpi;
+ }
+
+ CDEBUG(D_QUOTA, "Pool %s is added, pools %p qti_pools %p pool_num %d\n",
+ qpi->qpi_name, pools, qti->qti_pools, qti->qti_pools_cnt);
+
+ return 0;
+}
+
+static inline void qti_pools_fini(const struct lu_env *env)
+{
+ struct qmt_thread_info *qti = qmt_info(env);
+ struct qmt_pool_info **pools = qti->qti_pools;
+ int i;
+
+ LASSERT(qti->qti_pools_cnt > 0);
+
+ pools = qti_pools(qti);
+ for (i = 0; i < qti->qti_pools_cnt; i++) {
+ up_read(&pools[i]->qpi_recalc_sem);
+ qpi_putref(env, pools[i]);
+ }
+
+ if (qti->qti_pools_num > QMT_MAX_POOL_NUM)
+ OBD_FREE(qti->qti_pools,
+ qti->qti_pools_num * sizeof(struct qmt_pool_info *));
+}
+
/*
- * Look-up a pool in the hash table based on the pool ID and type.
+ * Look-up a pool in a list based on the type.
*
- * \param env - is the environment passed by the caller
- * \param qmt - is the quota master target
- * \param pool_id - is the 16-bit identifier of the pool to look up
- * \param pool_type - is the type of this pool, either LQUOTA_RES_MD or
+ * \param env - is the environment passed by the caller
+ * \param qmt - is the quota master target
+ * \param rtype - is the type of this pool, either LQUOTA_RES_MD or
* LQUOTA_RES_DT.
+ * \param pool_name - is the pool name to search for
+ * \param idx - OST or MDT index to search for. When it is >= 0, function
+ * returns array with pointers to all pools that include
+ * targets with requested index.
+ * \param add - add to qti_pool_arr if true
*/
-static struct qmt_pool_info *qmt_pool_lookup(const struct lu_env *env,
+struct qmt_pool_info *qmt_pool_lookup(const struct lu_env *env,
struct qmt_device *qmt,
- int pool_id, int pool_type)
+ int rtype,
+ char *pool_name,
+ int idx, bool add)
{
- struct qmt_pool_info *pool;
- __u32 key;
+ struct qmt_pool_info *pos, *pool;
+ int rc;
ENTRY;
- LASSERT(qmt->qmt_pool_hash != NULL);
-
- /* look-up pool in hash table */
- key = pool_id + (pool_type << 16);
- pool = cfs_hash_lookup(qmt->qmt_pool_hash, (void *)&key);
- if (pool == NULL) {
- /* this qmt isn't managing this pool! */
- CERROR("%s: looking up quota entry for a pool (0x%x/%d) which "
- "isn't managed by this quota master target\n",
- qmt->qmt_svname, pool_id, pool_type);
+ down_read(&qmt->qmt_pool_lock);
+ if (list_empty(&qmt->qmt_pool_list)) {
+ up_read(&qmt->qmt_pool_lock);
RETURN(ERR_PTR(-ENOENT));
}
- RETURN(pool);
+
+ CDEBUG(D_QUOTA, "type %d name %p index %d\n",
+ rtype, pool_name, idx);
+ /* Now just find a pool with correct type in a list. Further we need
+ * to go through the list and find a pool that includes requested OST
+ * or MDT. Possibly this would return a list of pools that includes
+ * needed target(OST/MDT). */
+ pool = NULL;
+ if (idx == -1 && !pool_name)
+ pool_name = GLB_POOL_NAME;
+
+ list_for_each_entry(pos, &qmt->qmt_pool_list, qpi_linkage) {
+ if (pos->qpi_rtype != rtype)
+ continue;
+
+ if (idx >= 0 && !qmt_sarr_check_idx(pos, idx)) {
+ rc = qti_pools_add(env, pos);
+ if (rc)
+ GOTO(out_err, rc);
+ continue;
+ }
+
+ if (pool_name && !strncmp(pool_name, pos->qpi_name,
+ LOV_MAXPOOLNAME)) {
+ pool = pos;
+ if (add) {
+ rc = qti_pools_add(env, pos);
+ if (rc)
+ GOTO(out_err, rc);
+ } else {
+ qpi_getref(pool);
+ }
+ break;
+ }
+ }
+ up_read(&qmt->qmt_pool_lock);
+
+ if (idx >= 0 && qti_pools_cnt(env))
+ pool = qti_pools_env(env)[0];
+
+ RETURN(pool ? : ERR_PTR(-ENOENT));
+out_err:
+ CERROR("%s: cannot add pool %s: err = %d\n",
+ qmt->qmt_svname, pos->qpi_name, rc);
+ RETURN(ERR_PTR(rc));
}
/*
*/
/*
- * Destroy all pools which are still in the hash table and free the pool
- * hash table.
+ * Destroy all pools which are still in the pool list.
*
* \param env - is the environment passed by the caller
* \param qmt - is the quota master target
*/
void qmt_pool_fini(const struct lu_env *env, struct qmt_device *qmt)
{
- struct qmt_pool_info *pool;
- struct list_head *pos, *n;
+ struct qmt_pool_info *pool, *tmp;
ENTRY;
- if (qmt->qmt_pool_hash == NULL)
- RETURN_EXIT;
-
/* parse list of pool and destroy each element */
- list_for_each_safe(pos, n, &qmt->qmt_pool_list) {
- pool = list_entry(pos, struct qmt_pool_info,
- qpi_linkage);
- /* remove from hash */
- cfs_hash_del(qmt->qmt_pool_hash, &pool->qpi_key,
- &pool->qpi_hash);
-
- /* remove from list */
- list_del_init(&pool->qpi_linkage);
-
+ list_for_each_entry_safe(pool, tmp, &qmt->qmt_pool_list, qpi_linkage) {
/* release extra reference taken in qmt_pool_alloc */
qpi_putref(env, pool);
}
LASSERT(list_empty(&qmt->qmt_pool_list));
- cfs_hash_putref(qmt->qmt_pool_hash);
- qmt->qmt_pool_hash = NULL;
EXIT;
}
* pool which are instantiated in this function.
*
* \param env - is the environment passed by the caller
- * \param qmt - is the quota master target for which we have to initializa the
+ * \param qmt - is the quota master target for which we have to initialize the
* pool configuration
*
* \retval - 0 on success, appropriate error on failure
int res, rc = 0;
ENTRY;
- /* initialize pool hash table */
- qmt->qmt_pool_hash = cfs_hash_create("POOL_HASH",
- HASH_POOLS_CUR_BITS,
- HASH_POOLS_MAX_BITS,
- HASH_POOLS_BKT_BITS, 0,
- CFS_HASH_MIN_THETA,
- CFS_HASH_MAX_THETA,
- &qpi_hash_ops,
- CFS_HASH_DEFAULT);
- if (qmt->qmt_pool_hash == NULL) {
- CERROR("%s: failed to create pool hash table\n",
- qmt->qmt_svname);
- RETURN(-ENOMEM);
- }
-
- /* initialize pool list */
INIT_LIST_HEAD(&qmt->qmt_pool_list);
+ init_rwsem(&qmt->qmt_pool_lock);
- /* Instantiate pool master for the default data and metadata pool (both
- * have pool ID equals to 0).
+ /* Instantiate pool master for the default data and metadata pool.
* This code will have to be revisited once we support quota on
* non-default pools */
for (res = LQUOTA_FIRST_RES; res < LQUOTA_LAST_RES; res++) {
- rc = qmt_pool_alloc(env, qmt, 0, res);
+ rc = qmt_pool_alloc(env, qmt, GLB_POOL_NAME, res);
if (rc)
break;
}
static int qmt_slv_cnt(const struct lu_env *env, struct lu_fid *glb_fid,
char *slv_name, struct lu_fid *slv_fid, void *arg)
{
- int *nr = arg;
-
+ struct obd_uuid uuid;
+ int (*nr)[QMT_STYPE_CNT][LL_MAXQUOTAS] = arg;
+ int stype, qtype;
+ int rc;
+
+ rc = lquota_extract_fid(glb_fid, NULL, &qtype);
+ LASSERT(!rc);
+
+ obd_str2uuid(&uuid, slv_name);
+ stype = qmt_uuid2idx(&uuid, NULL);
+ if (stype < 0)
+ return stype;
/* one more slave */
- (*nr)++;
+ (*nr)[stype][qtype]++;
+ CDEBUG(D_QUOTA, "slv_name %s stype %d qtype %d nr %d\n",
+ slv_name, stype, qtype, (*nr)[stype][qtype]);
return 0;
}
* Set up on-disk index files associated with each pool.
*
* \param env - is the environment passed by the caller
- * \param qmt - is the quota master target for which we have to initializa the
+ * \param qmt - is the quota master target for which we have to initialize the
* pool configuration
* \param qmt_root - is the on-disk directory created for the QMT.
+ * \param name - is the pool name that we need to setup. Setup all pools
+ * in qmt_pool_list when name is NULL.
*
* \retval - 0 on success, appropriate error on failure
*/
int qmt_pool_prepare(const struct lu_env *env, struct qmt_device *qmt,
- struct dt_object *qmt_root)
+ struct dt_object *qmt_root, char *name)
{
struct qmt_thread_info *qti = qmt_info(env);
struct lquota_glb_rec *rec = &qti->qti_glb_rec;
struct dt_device *dev = NULL;
dt_obj_version_t version;
struct list_head *pos;
- int rc = 0, qtype;
+ int rc = 0, i, qtype;
ENTRY;
- LASSERT(qmt->qmt_pool_hash != NULL);
-
- /* iterate over each pool in the hash and allocate a quota site for each
+ /* iterate over each pool in the list and allocate a quota site for each
* one. This involves creating a global index file on disk */
list_for_each(pos, &qmt->qmt_pool_list) {
struct dt_object *obj;
- int pool_type, pool_id;
struct lquota_entry *lqe;
+ char *pool_name;
+ int rtype;
pool = list_entry(pos, struct qmt_pool_info,
qpi_linkage);
- pool_id = pool->qpi_key & 0x0000ffff;
- pool_type = pool->qpi_key >> 16;
+ pool_name = pool->qpi_name;
+ if (name && strncmp(pool_name, name, LOV_MAXPOOLNAME))
+ continue;
+ rtype = pool->qpi_rtype;
if (dev == NULL)
dev = pool->qpi_qmt->qmt_child;
/* allocate directory for this pool */
- sprintf(qti->qti_buf, "%s-0x%x", RES_NAME(pool_type), pool_id);
+ snprintf(qti->qti_buf, LQUOTA_NAME_MAX, "%s-%s",
+ RES_NAME(rtype), pool_name);
obj = lquota_disk_dir_find_create(env, qmt->qmt_child, qmt_root,
qti->qti_buf);
if (IS_ERR(obj))
RETURN(PTR_ERR(obj));
pool->qpi_root = obj;
- for (qtype = 0; qtype < MAXQUOTAS; qtype++) {
+ for (qtype = 0; qtype < LL_MAXQUOTAS; qtype++) {
/* Generating FID of global index in charge of storing
* settings for this quota type */
- lquota_generate_fid(&qti->qti_fid, pool_id, pool_type,
- qtype);
+ lquota_generate_fid(&qti->qti_fid, rtype, qtype);
/* open/create the global index file for this quota
- * type */
+ * type. If name is set, it means we came here from
+ * qmt_pool_new and can create glb index with a
+ * local generated FID. */
obj = lquota_disk_glb_find_create(env, dev,
pool->qpi_root,
- &qti->qti_fid, false);
+ &qti->qti_fid,
+ name ? true : false);
if (IS_ERR(obj)) {
rc = PTR_ERR(obj);
- CERROR("%s: failed to create glb index copy for"
- " %s type (%d)\n", qmt->qmt_svname,
- QTYPE_NAME(qtype), rc);
+ CERROR("%s: failed to create glb index copy for %s type: rc = %d\n",
+ qmt->qmt_svname, qtype_name(qtype), rc);
RETURN(rc);
}
rec->qbr_hardlimit = 0;
rec->qbr_softlimit = 0;
rec->qbr_granted = 0;
- rec->qbr_time = pool_type == LQUOTA_RES_MD ?
+ rec->qbr_time = rtype == LQUOTA_RES_MD ?
MAX_IQ_TIME : MAX_DQ_TIME;
rc = lquota_disk_write_glb(env, obj, 0, rec);
if (rc) {
- CERROR("%s: failed to set default "
- "grace time for %s type (%d)\n",
- qmt->qmt_svname,
- QTYPE_NAME(qtype), rc);
+ CERROR("%s: failed to set default grace time for %s type: rc = %d\n",
+ qmt->qmt_svname, qtype_name(qtype), rc);
RETURN(rc);
}
rc = lquota_disk_update_ver(env, dev, obj, 1);
if (rc) {
- CERROR("%s: failed to set initial "
- "version for %s type (%d)\n",
- qmt->qmt_svname,
- QTYPE_NAME(qtype), rc);
+ CERROR("%s: failed to set initial version for %s type: rc = %d\n",
+ qmt->qmt_svname, qtype_name(qtype), rc);
RETURN(rc);
}
}
&qmt_lqe_ops);
if (IS_ERR(pool->qpi_site[qtype])) {
rc = PTR_ERR(pool->qpi_site[qtype]);
- CERROR("%s: failed to create site for %s type "
- "(%d)\n", qmt->qmt_svname,
- QTYPE_NAME(qtype), rc);
+ CERROR("%s: failed to create site for %s type: rc = %d\n",
+ qmt->qmt_svname, qtype_name(qtype), rc);
RETURN(rc);
}
/* count number of slaves which already connected to
* the master in the past */
- pool->qpi_slv_nr[qtype] = 0;
+ for (i = 0; i < QMT_STYPE_CNT; i++)
+ pool->qpi_slv_nr[i][qtype] = 0;
+
rc = lquota_disk_for_each_slv(env, pool->qpi_root,
&qti->qti_fid,
qmt_slv_cnt,
- &pool->qpi_slv_nr[qtype]);
+ &pool->qpi_slv_nr);
if (rc) {
- CERROR("%s: failed to scan & count slave "
- "indexes for %s type (%d)\n",
- qmt->qmt_svname, QTYPE_NAME(qtype), rc);
+ CERROR("%s: failed to scan & count slave indexes for %s type: rc = %d\n",
+ qmt->qmt_svname, qtype_name(qtype), rc);
RETURN(rc);
}
#ifdef CONFIG_PROC_FS
/* add procfs file to dump the global index, mostly for
* debugging purpose */
- sprintf(qti->qti_buf, "glb-%s", QTYPE_NAME(qtype));
+ snprintf(qti->qti_buf, MTI_NAME_MAXLEN,
+ "glb-%s", qtype_name(qtype));
rc = lprocfs_seq_create(pool->qpi_proc, qti->qti_buf,
0444, &lprocfs_quota_seq_fops,
obj);
if (rc)
- CWARN("%s: Error adding procfs file for global"
- "quota index "DFID", rc:%d\n",
+ CWARN("%s: Error adding procfs file for global quota index "DFID": rc = %d\n",
qmt->qmt_svname, PFID(&qti->qti_fid), rc);
#endif
}
+ if (name)
+ break;
}
RETURN(0);
{
struct qmt_pool_info *pool;
struct dt_object *slv_obj;
- int pool_id, pool_type, qtype;
+ int pool_type, qtype, stype;
bool created = false;
- int rc = 0;
+ int idx, i, rc = 0;
+
+ stype = qmt_uuid2idx(uuid, &idx);
+ if (stype < 0)
+ RETURN(stype);
/* extract pool info from global index FID */
- rc = lquota_extract_fid(glb_fid, &pool_id, &pool_type, &qtype);
+ rc = lquota_extract_fid(glb_fid, &pool_type, &qtype);
if (rc)
RETURN(rc);
/* look-up pool in charge of this global index FID */
- pool = qmt_pool_lookup(env, qmt, pool_id, pool_type);
+ qti_pools_init(env);
+ pool = qmt_pool_lookup_arr(env, qmt, pool_type, idx);
if (IS_ERR(pool))
RETURN(PTR_ERR(pool));
/* retrieve slave fid & current object version */
memcpy(slv_fid, lu_object_fid(&slv_obj->do_lu), sizeof(*slv_fid));
*slv_ver = dt_version_get(env, slv_obj);
- lu_object_put(env, &slv_obj->do_lu);
+ dt_object_put(env, slv_obj);
if (created)
- pool->qpi_slv_nr[qtype]++;
+ for (i = 0; i < qti_pools_cnt(env); i++)
+ qti_pools_env(env)[i]->qpi_slv_nr[stype][qtype]++;
out:
- qpi_putref(env, pool);
+ qti_pools_fini(env);
RETURN(rc);
}
* Look-up a lquota_entry in the pool hash and allocate it if not found.
*
* \param env - is the environment passed by the caller
- * \param qmt - is the quota master target for which we have to initializa the
+ * \param qmt - is the quota master target for which we have to initialize the
* pool configuration
- * \param pool_id - is the 16-bit identifier of the pool
* \param pool_type - is the pool type, either LQUOTA_RES_MD or LQUOTA_RES_DT.
* \param qtype - is the quota type, either user or group.
* \param qid - is the quota ID to look-up
*/
struct lquota_entry *qmt_pool_lqe_lookup(const struct lu_env *env,
struct qmt_device *qmt,
- int pool_id, int pool_type,
- int qtype, union lquota_id *qid)
+ int pool_type, int qtype,
+ union lquota_id *qid,
+ char *pool_name)
{
struct qmt_pool_info *pool;
struct lquota_entry *lqe;
ENTRY;
/* look-up pool responsible for this global index FID */
- pool = qmt_pool_lookup(env, qmt, pool_id, pool_type);
+ pool = qmt_pool_lookup_name(env, qmt, pool_type, pool_name);
if (IS_ERR(pool))
- RETURN((void *)pool);
+ RETURN(ERR_CAST(pool));
if (qid->qid_uid == 0) {
/* caller wants to access grace time, no need to look up the
qpi_putref(env, pool);
RETURN(lqe);
}
+
+int qmt_pool_lqes_lookup(const struct lu_env *env,
+ struct qmt_device *qmt,
+ int rtype, int stype,
+ int qtype, union lquota_id *qid,
+ char *pool_name, int idx)
+{
+ struct qmt_pool_info *pool;
+ struct lquota_entry *lqe;
+ int rc, i;
+ ENTRY;
+
+ /* Until MDT pools are not emplemented, all MDTs belong to
+ * global pool, thus lookup lqes only from global pool. */
+ if (rtype == LQUOTA_RES_DT && stype == QMT_STYPE_MDT)
+ idx = -1;
+
+ qti_pools_init(env);
+ rc = 0;
+ /* look-up pool responsible for this global index FID */
+ pool = qmt_pool_lookup_arr(env, qmt, rtype, idx);
+ if (IS_ERR(pool)) {
+ qti_pools_fini(env);
+ RETURN(PTR_ERR(pool));
+ }
+
+ /* now that we have the pool, let's look-up the quota entry in the
+ * right quota site */
+ qti_lqes_init(env);
+ for (i = 0; i < qti_pools_cnt(env); i++) {
+ pool = qti_pools_env(env)[i];
+ lqe = lqe_locate(env, pool->qpi_site[qtype], qid);
+ if (IS_ERR(lqe)) {
+ qti_lqes_fini(env);
+ GOTO(out, rc = PTR_ERR(lqe));
+ }
+ /* Only release could be done for not enforced lqe
+ * (see qmt_dqacq0). However slave could request to
+ * release more than not global lqe had granted before
+ * lqe_enforced was cleared. It is legal case,
+ * because even if current lqe is not enforced,
+ * lqes from other pools are still active and avilable
+ * for acquiring. Furthermore, skip not enforced lqe
+ * to don't make extra allocations. */
+ /*if (!lqe_is_glbl(lqe) && !lqe->lqe_enforced) {
+ lqe_putref(lqe);
+ continue;
+ }*/
+ qti_lqes_add(env, lqe);
+ }
+ LASSERT(qti_lqes_glbl(env)->lqe_is_global);
+
+out:
+ qti_pools_fini(env);
+ RETURN(rc);
+}
+
+static int lqes_cmp(const void *arg1, const void *arg2)
+{
+ const struct lquota_entry *lqe1, *lqe2;
+ lqe1 = *(const struct lquota_entry **)arg1;
+ lqe2 = *(const struct lquota_entry **)arg2;
+ return lqe1->lqe_qunit - lqe2->lqe_qunit;
+}
+
+void qmt_lqes_sort(const struct lu_env *env)
+{
+ sort(qti_lqes(env), qti_lqes_cnt(env), sizeof(void *), lqes_cmp, NULL);
+ /* global lqe was moved during sorting */
+ if (!qti_lqes_glbl(env)->lqe_is_global) {
+ int i;
+ for (i = 0; i < qti_lqes_cnt(env); i++) {
+ if (qti_lqes(env)[i]->lqe_is_global) {
+ qti_glbl_lqe_idx(env) = i;
+ break;
+ }
+ }
+ }
+}
+
+int qmt_pool_lqes_lookup_spec(const struct lu_env *env, struct qmt_device *qmt,
+ int rtype, int qtype, union lquota_id *qid)
+{
+ struct qmt_pool_info *pos;
+ struct lquota_entry *lqe;
+ int rc = 0;
+
+ qti_lqes_init(env);
+ down_read(&qmt->qmt_pool_lock);
+ if (list_empty(&qmt->qmt_pool_list)) {
+ up_read(&qmt->qmt_pool_lock);
+ RETURN(-ENOENT);
+ }
+
+ list_for_each_entry(pos, &qmt->qmt_pool_list, qpi_linkage) {
+ if (pos->qpi_rtype != rtype)
+ continue;
+ /* Don't take into account pools without slaves */
+ if (!qpi_slv_nr(pos, qtype))
+ continue;
+ lqe = lqe_find(env, pos->qpi_site[qtype], qid);
+ /* ENOENT is valid case for lqe from non global pool
+ * that hasn't limits, i.e. not enforced. Continue even
+ * in case of error - we can handle already found lqes */
+ if (IS_ERR_OR_NULL(lqe)) {
+ /* let know that something went wrong */
+ rc = lqe ? PTR_ERR(lqe) : -ENOENT;
+ continue;
+ }
+ if (!lqe->lqe_enforced) {
+ /* no settings for this qid_uid */
+ lqe_putref(lqe);
+ continue;
+ }
+ qti_lqes_add(env, lqe);
+ CDEBUG(D_QUOTA, "adding lqe %p from pool %s\n",
+ lqe, pos->qpi_name);
+ }
+ up_read(&qmt->qmt_pool_lock);
+ RETURN(rc);
+}
+
+/**
+ * Allocate a new pool for the specified device.
+ *
+ * Allocate a new pool_desc structure for the specified \a new_pool
+ * device to create a pool with the given \a poolname. The new pool
+ * structure is created with a single reference, and is freed when the
+ * reference count drops to zero.
+ *
+ * \param[in] obd Lustre OBD device on which to add a pool iterator
+ * \param[in] poolname the name of the pool to be created
+ *
+ * \retval 0 in case of success
+ * \retval negative error code in case of error
+ */
+int qmt_pool_new(struct obd_device *obd, char *poolname)
+{
+ struct qmt_device *qmt = lu2qmt_dev(obd->obd_lu_dev);
+ struct qmt_pool_info *qpi;
+ struct lu_env env;
+ int rc;
+ ENTRY;
+
+ if (strnlen(poolname, LOV_MAXPOOLNAME + 1) > LOV_MAXPOOLNAME)
+ RETURN(-ENAMETOOLONG);
+
+ rc = lu_env_init(&env, LCT_MD_THREAD);
+ if (rc) {
+ CERROR("%s: can't init env: rc = %d\n", obd->obd_name, rc);
+ RETURN(rc);
+ }
+
+ qpi = qmt_pool_lookup_name(&env, qmt, LQUOTA_RES_DT, poolname);
+ if (!IS_ERR(qpi)) {
+ /* Valid case when several MDTs are mounted
+ * at the same node. */
+ CDEBUG(D_QUOTA, "pool %s already exists\n", poolname);
+ qpi_putref(&env, qpi);
+ GOTO(out_env, rc = -EEXIST);
+ }
+ if (PTR_ERR(qpi) != -ENOENT) {
+ CWARN("%s: pool %s lookup failed: rc = %ld\n",
+ obd->obd_name, poolname, PTR_ERR(qpi));
+ GOTO(out_env, rc = PTR_ERR(qpi));
+ }
+
+ /* Now allocate and prepare only DATA pool.
+ * Further when MDT pools will be ready we need to add
+ * a cycle here and setup pools of both types. Another
+ * approach is to find out pool of which type should be
+ * created. */
+ rc = qmt_pool_alloc(&env, qmt, poolname, LQUOTA_RES_DT);
+ if (rc) {
+ CERROR("%s: can't alloc pool %s: rc = %d\n",
+ obd->obd_name, poolname, rc);
+ GOTO(out_env, rc);
+ }
+
+ rc = qmt_pool_prepare(&env, qmt, qmt->qmt_root, poolname);
+ if (rc) {
+ CERROR("%s: can't prepare pool for %s: rc = %d\n",
+ obd->obd_name, poolname, rc);
+ GOTO(out_err, rc);
+ }
+
+ CDEBUG(D_QUOTA, "Quota pool "LOV_POOLNAMEF" added\n",
+ poolname);
+
+ GOTO(out_env, rc);
+out_err:
+ qpi = qmt_pool_lookup_name(&env, qmt, LQUOTA_RES_DT, poolname);
+ if (!IS_ERR(qpi)) {
+ qpi_putref(&env, qpi);
+ qpi_putref(&env, qpi);
+ }
+out_env:
+ lu_env_fini(&env);
+ return rc;
+}
+
+static int
+qmt_obj_recalc(const struct lu_env *env, struct dt_object *obj,
+ struct lquota_site *site)
+{
+ struct qmt_thread_info *qti = qmt_info(env);
+ union lquota_id *qid = &qti->qti_id;
+ const struct dt_it_ops *iops;
+ struct dt_key *key;
+ struct dt_it *it;
+ __u64 granted;
+ int rc;
+ ENTRY;
+
+ iops = &obj->do_index_ops->dio_it;
+
+ it = iops->init(env, obj, 0);
+ if (IS_ERR(it)) {
+ CWARN("quota: initialize it for "DFID" failed: rc = %ld\n",
+ PFID(&qti->qti_fid), PTR_ERR(it));
+ RETURN(PTR_ERR(it));
+ }
+
+ rc = iops->load(env, it, 0);
+ if (rc < 0) {
+ CWARN("quota: load first entry for "DFID" failed: rc = %d\n",
+ PFID(&qti->qti_fid), rc);
+ GOTO(out, rc);
+ } else if (rc == 0) {
+ rc = iops->next(env, it);
+ if (rc != 0)
+ GOTO(out, rc = (rc < 0) ? rc : 0);
+ }
+
+ do {
+ struct lquota_entry *lqe;
+
+ key = iops->key(env, it);
+ if (IS_ERR(key)) {
+ CWARN("quota: error key for "DFID": rc = %ld\n",
+ PFID(&qti->qti_fid), PTR_ERR(key));
+ GOTO(out, rc = PTR_ERR(key));
+ }
+
+ /* skip the root user/group */
+ if (*((__u64 *)key) == 0)
+ goto next;
+
+ qid->qid_uid = *((__u64 *)key);
+
+ rc = qmt_slv_read(env, qid, obj, &granted);
+ if (!granted)
+ goto next;
+
+ lqe = lqe_locate(env, site, qid);
+ if (IS_ERR(lqe))
+ GOTO(out, rc = PTR_ERR(lqe));
+ lqe_write_lock(lqe);
+ lqe->lqe_recalc_granted += granted;
+ lqe_write_unlock(lqe);
+ lqe_putref(lqe);
+next:
+ rc = iops->next(env, it);
+ if (rc < 0)
+ CWARN("quota: failed to parse index "DFID
+ ", ->next error: rc = %d\n",
+ PFID(&qti->qti_fid), rc);
+ } while (rc == 0 && !kthread_should_stop());
+
+out:
+ iops->put(env, it);
+ iops->fini(env, it);
+ RETURN(rc);
+}
+
+static int qmt_site_recalc_cb(struct cfs_hash *hs, struct cfs_hash_bd *bd,
+ struct hlist_node *hnode, void *data)
+{
+ struct lquota_entry *lqe;
+ struct lu_env *env = data;
+
+ lqe = hlist_entry(hnode, struct lquota_entry, lqe_hash);
+ LASSERT(atomic_read(&lqe->lqe_ref) > 0);
+
+ lqe_write_lock(lqe);
+ if (lqe->lqe_granted != lqe->lqe_recalc_granted) {
+ struct qmt_device *qmt = lqe2qpi(lqe)->qpi_qmt;
+ struct thandle *th;
+ bool need_notify = false;
+ int rc;
+
+ LQUOTA_DEBUG(lqe, "lqe_recalc_granted %llu\n",
+ lqe->lqe_recalc_granted);
+ lqe->lqe_granted = lqe->lqe_recalc_granted;
+ /* Always returns true, if there is no slaves in a pool */
+ need_notify |= qmt_adjust_qunit(env, lqe);
+ need_notify |= qmt_adjust_edquot(lqe, ktime_get_real_seconds());
+ if (need_notify) {
+ /* Find all lqes with lqe_id to reseed lgd array */
+ rc = qmt_pool_lqes_lookup_spec(env, qmt, lqe_rtype(lqe),
+ lqe_qtype(lqe), &lqe->lqe_id);
+ if (!rc && qti_lqes_glbl(env)->lqe_glbl_data) {
+ qmt_seed_glbe(env,
+ qti_lqes_glbl(env)->lqe_glbl_data);
+ qmt_id_lock_notify(qmt, qti_lqes_glbl(env));
+ }
+ qti_lqes_fini(env);
+ }
+ th = dt_trans_create(env, qmt->qmt_child);
+ if (IS_ERR(th))
+ goto out;
+
+ rc = lquota_disk_declare_write(env, th,
+ LQE_GLB_OBJ(lqe),
+ &lqe->lqe_id);
+ if (rc)
+ GOTO(out_stop, rc);
+
+ rc = dt_trans_start_local(env, qmt->qmt_child, th);
+ if (rc)
+ GOTO(out_stop, rc);
+
+ qmt_glb_write(env, th, lqe, 0, NULL);
+out_stop:
+ dt_trans_stop(env, qmt->qmt_child, th);
+ }
+out:
+ lqe->lqe_recalc_granted = 0;
+ lqe_write_unlock(lqe);
+
+ return 0;
+}
+
+#define MDT_DEV_NAME_LEN (LUSTRE_MAXFSNAME + sizeof("-MDT0000"))
+static struct obd_device *qmt_get_mgc(struct qmt_device *qmt)
+{
+ char mdt_name[MDT_DEV_NAME_LEN];
+ struct lustre_mount_info *lmi;
+ struct obd_device *obd;
+ int rc;
+ ENTRY;
+
+ rc = server_name2fsname(qmt->qmt_svname, mdt_name, NULL);
+ if (rc) {
+ CERROR("quota: cannot get server name from %s: rc = %d\n",
+ qmt->qmt_svname, rc);
+ RETURN(ERR_PTR(rc));
+ }
+
+ strlcat(mdt_name, "-MDT0000", MDT_DEV_NAME_LEN);
+ lmi = server_get_mount(mdt_name);
+ if (lmi == NULL) {
+ rc = -ENOENT;
+ CERROR("%s: cannot get mount info from %s: rc = %d\n",
+ qmt->qmt_svname, mdt_name, rc);
+ RETURN(ERR_PTR(rc));
+ }
+ obd = s2lsi(lmi->lmi_sb)->lsi_mgc;
+ lustre_put_lsi(lmi->lmi_sb);
+
+ RETURN(obd);
+}
+
+static int qmt_pool_recalc(void *args)
+{
+ struct qmt_pool_info *pool, *glbl_pool;
+ struct rw_semaphore *sem = NULL;
+ struct obd_device *obd;
+ struct lu_env env;
+ int i, rc, qtype, slaves_cnt;
+ ENTRY;
+
+ pool = args;
+
+ obd = qmt_get_mgc(pool->qpi_qmt);
+ if (IS_ERR(obd))
+ GOTO(out, rc = PTR_ERR(obd));
+ else
+ /* Waiting for the end of processing mgs config.
+ * It is needed to be sure all pools are configured. */
+ while (obd->obd_process_conf)
+ schedule_timeout_uninterruptible(cfs_time_seconds(1));
+
+ sem = qmt_sarr_rwsem(pool);
+ LASSERT(sem);
+ down_read(sem);
+ /* Hold this to be sure that OSTs from this pool
+ * can't do acquire/release.
+ *
+ * I guess below write semaphore could be a bottleneck
+ * as qmt_dqacq would be blocked trying to hold
+ * read_lock at qmt_pool_lookup->qti_pools_add.
+ * But on the other hand adding/removing OSTs to the pool is
+ * a rare operation. If finally this would be a problem,
+ * we can consider another approach. For example we can
+ * iterate through the POOL's lqes. Take lqe, hold lqe_write_lock
+ * and go through appropriate OSTs. I don't use this approach now
+ * as newly created pool hasn't lqes entries. So firstly we need
+ * to get this lqes from the global pool index file. This
+ * solution looks more complex, so leave it as it is. */
+ down_write(&pool->qpi_recalc_sem);
+
+ rc = lu_env_init(&env, LCT_MD_THREAD);
+ if (rc) {
+ CERROR("%s: cannot init env: rc = %d\n", obd->obd_name, rc);
+ GOTO(out, rc);
+ }
+
+ glbl_pool = qmt_pool_lookup_glb(&env, pool->qpi_qmt, pool->qpi_rtype);
+ if (IS_ERR(glbl_pool))
+ GOTO(out_env, rc = PTR_ERR(glbl_pool));
+
+ slaves_cnt = qmt_sarr_count(pool);
+ CDEBUG(D_QUOTA, "Starting pool recalculation for %d slaves in %s\n",
+ slaves_cnt, pool->qpi_name);
+
+ for (qtype = 0; qtype < LL_MAXQUOTAS; qtype++) {
+ for (i = 0; i < slaves_cnt; i++) {
+ struct qmt_thread_info *qti = qmt_info(&env);
+ struct dt_object *slv_obj;
+ struct obd_uuid uuid;
+ int idx;
+
+ if (kthread_should_stop())
+ GOTO(out_stop, rc = 0);
+ idx = qmt_sarr_get_idx(pool, i);
+ LASSERT(idx >= 0);
+
+ /* We don't need fsname here - anyway
+ * lquota_disk_slv_filename ignores it. */
+ snprintf(uuid.uuid, UUID_MAX, "-OST%04x_UUID", idx);
+ lquota_generate_fid(&qti->qti_fid, pool->qpi_rtype,
+ qtype);
+ /* look-up index file associated with acquiring slave */
+ slv_obj = lquota_disk_slv_find(&env,
+ glbl_pool->qpi_qmt->qmt_child,
+ glbl_pool->qpi_root,
+ &qti->qti_fid,
+ &uuid);
+ if (IS_ERR(slv_obj))
+ GOTO(out_stop, rc = PTR_ERR(slv_obj));
+
+ CDEBUG(D_QUOTA, "slv_obj is found %p for uuid %s\n",
+ slv_obj, uuid.uuid);
+ qmt_obj_recalc(&env, slv_obj, pool->qpi_site[qtype]);
+ dt_object_put(&env, slv_obj);
+ }
+ /* Now go trough the site hash and compare lqe_granted
+ * with lqe_calc_granted. Write new value if disagree */
+
+ cfs_hash_for_each(pool->qpi_site[qtype]->lqs_hash,
+ qmt_site_recalc_cb, &env);
+ }
+ GOTO(out_stop, rc);
+out_stop:
+ qpi_putref(&env, glbl_pool);
+out_env:
+ lu_env_fini(&env);
+out:
+ if (xchg(&pool->qpi_recalc_task, NULL) == NULL)
+ /*
+ * Someone is waiting for us to stop - be sure not to exit
+ * before kthread_stop() gets a ref on the task. No event
+ * will happen on 'pool, this is just a convenient way to
+ * wait.
+ */
+ wait_var_event(pool, kthread_should_stop());
+
+ clear_bit(QPI_FLAG_RECALC_OFFSET, &pool->qpi_flags);
+ /* Pool can't be changed, since sem has been down.
+ * Thus until up_read, no one can restart recalc thread. */
+ if (sem) {
+ up_read(sem);
+ up_write(&pool->qpi_recalc_sem);
+ }
+ qpi_putref(&env, pool);
+
+ return rc;
+}
+
+static int qmt_start_pool_recalc(struct lu_env *env, struct qmt_pool_info *qpi)
+{
+ struct task_struct *task;
+ int rc = 0;
+
+ if (!test_and_set_bit(QPI_FLAG_RECALC_OFFSET, &qpi->qpi_flags)) {
+ LASSERT(!qpi->qpi_recalc_task);
+
+ qpi_getref(qpi);
+ task = kthread_create(qmt_pool_recalc, qpi,
+ "qsd_reint_%s", qpi->qpi_name);
+ if (IS_ERR(task)) {
+ clear_bit(QPI_FLAG_RECALC_OFFSET, &qpi->qpi_flags);
+ rc = PTR_ERR(task);
+ qpi_putref(env, qpi);
+ } else {
+ qpi->qpi_recalc_task = task;
+ /* Using park/unpark to start the thread ensures that
+ * the thread function does get calls, so the
+ * ref on qpi will be dropped
+ */
+ kthread_park(task);
+ kthread_unpark(task);
+ }
+ }
+
+ RETURN(rc);
+}
+
+static inline void qmt_stop_pool_recalc(struct qmt_pool_info *qpi)
+{
+ struct task_struct *task;
+
+ task = xchg(&qpi->qpi_recalc_task, NULL);
+ if (task)
+ kthread_stop(task);
+}
+
+static int qmt_pool_slv_nr_change(const struct lu_env *env,
+ struct qmt_pool_info *pool,
+ int idx, bool add)
+{
+ struct qmt_pool_info *glbl_pool;
+ int qtype;
+
+ glbl_pool = qmt_pool_lookup_glb(env, pool->qpi_qmt, LQUOTA_RES_DT);
+ if (IS_ERR(glbl_pool))
+ RETURN(PTR_ERR(glbl_pool));
+
+ for (qtype = 0; qtype < LL_MAXQUOTAS; qtype++) {
+ struct qmt_thread_info *qti = qmt_info(env);
+ struct dt_object *slv_obj;
+ struct obd_uuid uuid;
+
+ /* We don't need fsname here - anyway
+ * lquota_disk_slv_filename ignores it. */
+ snprintf(uuid.uuid, UUID_MAX, "-OST%04x_UUID", idx);
+ lquota_generate_fid(&qti->qti_fid, pool->qpi_rtype,
+ qtype);
+ /* look-up index file associated with acquiring slave */
+ slv_obj = lquota_disk_slv_find(env,
+ glbl_pool->qpi_qmt->qmt_child,
+ glbl_pool->qpi_root,
+ &qti->qti_fid,
+ &uuid);
+ if (IS_ERR(slv_obj))
+ continue;
+
+ if (add)
+ pool->qpi_slv_nr[QMT_STYPE_OST][qtype]++;
+ else
+ pool->qpi_slv_nr[QMT_STYPE_OST][qtype]--;
+ dt_object_put(env, slv_obj);
+ }
+ qpi_putref(env, glbl_pool);
+
+ return 0;
+}
+
+static int qmt_pool_add_rem(struct obd_device *obd, char *poolname,
+ char *slavename, bool add)
+{
+ struct qmt_device *qmt = lu2qmt_dev(obd->obd_lu_dev);
+ struct qmt_pool_info *qpi;
+ struct lu_env env;
+ int rc, idx;
+ ENTRY;
+
+ if (strnlen(poolname, LOV_MAXPOOLNAME + 1) > LOV_MAXPOOLNAME)
+ RETURN(-ENAMETOOLONG);
+
+ CDEBUG(D_QUOTA, add ? "%s: pool %s, adding %s\n" :
+ "%s: pool %s, removing %s\n",
+ obd->obd_name, poolname, slavename);
+
+ rc = server_name2index(slavename, &idx, NULL);
+ if (rc != LDD_F_SV_TYPE_OST)
+ RETURN(-EINVAL);
+
+ rc = lu_env_init(&env, LCT_MD_THREAD);
+ if (rc) {
+ CERROR("%s: cannot init env: rc = %d\n", obd->obd_name, rc);
+ RETURN(rc);
+ }
+
+ qpi = qmt_pool_lookup_name(&env, qmt, LQUOTA_RES_DT, poolname);
+ if (IS_ERR(qpi)) {
+ CWARN("%s: can't find pool %s: rc = %long\n",
+ obd->obd_name, poolname, PTR_ERR(qpi));
+ GOTO(out, rc = PTR_ERR(qpi));
+ }
+
+ rc = add ? qmt_sarr_pool_add(qpi, idx, 32) :
+ qmt_sarr_pool_rem(qpi, idx);
+ if (rc) {
+ CERROR("%s: can't %s %s pool %s: rc = %d\n",
+ add ? "add to" : "remove", obd->obd_name,
+ slavename, poolname, rc);
+ GOTO(out_putref, rc);
+ }
+ qmt_pool_slv_nr_change(&env, qpi, idx, add);
+ qmt_start_pool_recalc(&env, qpi);
+
+out_putref:
+ qpi_putref(&env, qpi);
+out:
+ lu_env_fini(&env);
+ RETURN(rc);
+}
+
+
+
+/**
+ * Add a single target device to the named pool.
+ *
+ * \param[in] obd OBD device on which to add the pool
+ * \param[in] poolname name of the pool to which to add the target \a slavename
+ * \param[in] slavename name of the target device to be added
+ *
+ * \retval 0 if \a slavename was (previously) added to the pool
+ * \retval negative error number on failure
+ */
+int qmt_pool_add(struct obd_device *obd, char *poolname, char *slavename)
+{
+ return qmt_pool_add_rem(obd, poolname, slavename, true);
+}
+
+/**
+ * Remove the named target from the specified pool.
+ *
+ * \param[in] obd OBD device from which to remove \a poolname
+ * \param[in] poolname name of the pool to be changed
+ * \param[in] slavename name of the target to remove from \a poolname
+ *
+ * \retval 0 on successfully removing \a slavename from the pool
+ * \retval negative number on error (e.g. \a slavename not in pool)
+ */
+int qmt_pool_rem(struct obd_device *obd, char *poolname, char *slavename)
+{
+ return qmt_pool_add_rem(obd, poolname, slavename, false);
+}
+
+/**
+ * Remove the named pool from the QMT device.
+ *
+ * \param[in] obd OBD device on which pool was previously created
+ * \param[in] poolname name of pool to remove from \a obd
+ *
+ * \retval 0 on successfully removing the pool
+ * \retval negative error numbers for failures
+ */
+int qmt_pool_del(struct obd_device *obd, char *poolname)
+{
+ struct qmt_device *qmt = lu2qmt_dev(obd->obd_lu_dev);
+ struct qmt_pool_info *qpi;
+ struct lu_fid fid;
+ char buf[LQUOTA_NAME_MAX];
+ struct lu_env env;
+ int rc;
+ int qtype;
+ ENTRY;
+
+ if (strnlen(poolname, LOV_MAXPOOLNAME + 1) > LOV_MAXPOOLNAME)
+ RETURN(-ENAMETOOLONG);
+
+ CDEBUG(D_QUOTA, "Removing quota pool "LOV_POOLNAMEF"\n",
+ poolname);
+
+ rc = lu_env_init(&env, LCT_MD_THREAD);
+ if (rc) {
+ CERROR("%s: cannot init env: rc = %d\n", obd->obd_name, rc);
+ RETURN(rc);
+ }
+
+ /* look-up pool in charge of this global index FID */
+ qpi = qmt_pool_lookup_name(&env, qmt, LQUOTA_RES_DT, poolname);
+ if (IS_ERR(qpi)) {
+ /* Valid case for several MDTs at the same node -
+ * pool removed by the 1st MDT in config */
+ CDEBUG(D_QUOTA, "Cannot find pool %s\n", poolname);
+ lu_env_fini(&env);
+ RETURN(PTR_ERR(qpi));
+ }
+
+ for (qtype = 0; qtype < LL_MAXQUOTAS; qtype++) {
+ lquota_generate_fid(&fid, LQUOTA_RES_DT, qtype);
+ snprintf(buf, LQUOTA_NAME_MAX, "0x%x", fid.f_oid);
+ rc = local_object_unlink(&env, qmt->qmt_child,
+ qpi->qpi_root, buf);
+ if (rc)
+ CWARN("%s: cannot unlink %s from pool %s: rc = %d\n",
+ obd->obd_name, buf, poolname, rc);
+ }
+
+ /* put ref from look-up */
+ qpi_putref(&env, qpi);
+ /* put last ref to free qpi */
+ qpi_putref(&env, qpi);
+
+ snprintf(buf, LQUOTA_NAME_MAX, "%s-%s",
+ RES_NAME(LQUOTA_RES_DT), poolname);
+ rc = local_object_unlink(&env, qmt->qmt_child, qmt->qmt_root, buf);
+ if (rc)
+ CWARN("%s: cannot unlink dir %s: rc = %d\n",
+ obd->obd_name, poolname, rc);
+
+ lu_env_fini(&env);
+ RETURN(0);
+}
+
+static inline int qmt_sarr_pool_init(struct qmt_pool_info *qpi)
+{
+
+ /* No need to initialize sarray for global pool
+ * as it always includes all slaves */
+ if (qmt_pool_global(qpi))
+ return 0;
+
+ switch (qpi->qpi_rtype) {
+ case LQUOTA_RES_DT:
+ return tgt_pool_init(&qpi->qpi_sarr.osts, 0);
+ case LQUOTA_RES_MD:
+ default:
+ return 0;
+ }
+}
+
+static inline int qmt_sarr_pool_add(struct qmt_pool_info *qpi, int idx, int min)
+{
+ switch (qpi->qpi_rtype) {
+ case LQUOTA_RES_DT:
+ return tgt_pool_add(&qpi->qpi_sarr.osts, idx, min);
+ case LQUOTA_RES_MD:
+ default:
+ return 0;
+ }
+}
+
+static inline int qmt_sarr_pool_rem(struct qmt_pool_info *qpi, int idx)
+{
+ switch (qpi->qpi_rtype) {
+ case LQUOTA_RES_DT:
+ return tgt_pool_remove(&qpi->qpi_sarr.osts, idx);
+ case LQUOTA_RES_MD:
+ default:
+ return 0;
+ }
+}
+
+static inline int qmt_sarr_pool_free(struct qmt_pool_info *qpi)
+{
+ if (qmt_pool_global(qpi))
+ return 0;
+
+ switch (qpi->qpi_rtype) {
+ case LQUOTA_RES_DT:
+ if (!qpi->qpi_sarr.osts.op_array)
+ return 0;
+ return tgt_pool_free(&qpi->qpi_sarr.osts);
+ case LQUOTA_RES_MD:
+ default:
+ return 0;
+ }
+}
+
+static inline int qmt_sarr_check_idx(struct qmt_pool_info *qpi, int idx)
+{
+ if (qmt_pool_global(qpi))
+ return 0;
+
+ switch (qpi->qpi_rtype) {
+ case LQUOTA_RES_DT:
+ return tgt_check_index(idx, &qpi->qpi_sarr.osts);
+ case LQUOTA_RES_MD:
+ default:
+ return 0;
+ }
+}
+
+struct rw_semaphore *qmt_sarr_rwsem(struct qmt_pool_info *qpi)
+{
+ switch (qpi->qpi_rtype) {
+ case LQUOTA_RES_DT:
+ /* to protect ost_pool use */
+ return &qpi->qpi_sarr.osts.op_rw_sem;
+ case LQUOTA_RES_MD:
+ default:
+ return NULL;
+ }
+}
+
+int qmt_sarr_get_idx(struct qmt_pool_info *qpi, int arr_idx)
+{
+
+ if (qmt_pool_global(qpi))
+ return arr_idx;
+
+ switch (qpi->qpi_rtype) {
+ case LQUOTA_RES_DT:
+ LASSERTF(arr_idx < qpi->qpi_sarr.osts.op_count && arr_idx >= 0,
+ "idx invalid %d op_count %d\n", arr_idx,
+ qpi->qpi_sarr.osts.op_count);
+ return qpi->qpi_sarr.osts.op_array[arr_idx];
+ case LQUOTA_RES_MD:
+ default:
+ return -EINVAL;
+ }
+}
+
+/* Number of slaves in a pool */
+unsigned int qmt_sarr_count(struct qmt_pool_info *qpi)
+{
+ switch (qpi->qpi_rtype) {
+ case LQUOTA_RES_DT:
+ return qpi->qpi_sarr.osts.op_count;
+ case LQUOTA_RES_MD:
+ default:
+ return -EINVAL;
+ }
+}