Whamcloud - gitweb
LU-15097 quota: stop pool_recalc before killing pool
[fs/lustre-release.git] / lustre / quota / qmt_pool.c
index 47f5b12..c4398a5 100644 (file)
@@ -21,7 +21,7 @@
  * GPL HEADER END
  */
 /*
- * Copyright (c) 2012, 2013, Intel Corporation.
+ * Copyright (c) 2012, 2017, Intel Corporation.
  * Use is subject to license terms.
  *
  * Author: Johann Lombardi <johann.lombardi@intel.com>
@@ -29,7 +29,7 @@
  */
 
 /*
- * A Quota Master Target has a hash table where it stores qmt_pool_info
+ * A Quota Master Target has a list(qmt_pool_list) where it stores qmt_pool_info
  * structures. There is one such structure for each pool managed by the QMT.
  *
  * Each pool can have different quota types enforced (typically user & group
 #include <lprocfs_status.h>
 #include "qmt_internal.h"
 
-static void qmt_pool_free(const struct lu_env *, struct qmt_pool_info *);
+static inline int qmt_sarr_pool_init(struct qmt_pool_info *qpi);
+static inline int qmt_sarr_pool_add(struct qmt_pool_info *qpi,
+                                   int idx, int min);
+static inline int qmt_sarr_pool_rem(struct qmt_pool_info *qpi, int idx);
+static inline void qmt_sarr_pool_free(struct qmt_pool_info *qpi);
+static inline int qmt_sarr_check_idx(struct qmt_pool_info *qpi, int idx);
+static inline void qmt_stop_pool_recalc(struct qmt_pool_info *qpi);
 
 /*
  * Static helper functions not used outside the scope of this file
  */
 
-/*
- * Reference counter management for qmt_pool_info structures
- */
-static inline void qpi_getref(struct qmt_pool_info *pool)
-{
-       atomic_inc(&pool->qpi_ref);
-}
-
-static inline void qpi_putref(const struct lu_env *env,
-                             struct qmt_pool_info *pool)
-{
-       LASSERT(atomic_read(&pool->qpi_ref) > 0);
-       if (atomic_dec_and_test(&pool->qpi_ref))
-               qmt_pool_free(env, pool);
-}
-
 static inline void qpi_putref_locked(struct qmt_pool_info *pool)
 {
        LASSERT(atomic_read(&pool->qpi_ref) > 1);
        atomic_dec(&pool->qpi_ref);
 }
 
-/*
- * Hash functions for qmt_pool_info management
- */
-
-static unsigned
-qpi_hash_hash(struct cfs_hash *hs, const void *key, unsigned mask)
-{
-       return cfs_hash_u32_hash(*((__u32 *)key), mask);
-}
-
-static void *qpi_hash_key(struct hlist_node *hnode)
-{
-       struct qmt_pool_info *pool;
-       pool = hlist_entry(hnode, struct qmt_pool_info, qpi_hash);
-       return &pool->qpi_key;
-}
-
-static int qpi_hash_keycmp(const void *key, struct hlist_node *hnode)
-{
-       struct qmt_pool_info *pool;
-       pool = hlist_entry(hnode, struct qmt_pool_info, qpi_hash);
-       return pool->qpi_key == *((__u32 *)key);
-}
-
-static void *qpi_hash_object(struct hlist_node *hnode)
-{
-       return hlist_entry(hnode, struct qmt_pool_info, qpi_hash);
-}
-
-static void qpi_hash_get(struct cfs_hash *hs, struct hlist_node *hnode)
-{
-       struct qmt_pool_info *pool;
-       pool = hlist_entry(hnode, struct qmt_pool_info, qpi_hash);
-       qpi_getref(pool);
-}
-
-static void qpi_hash_put_locked(struct cfs_hash *hs, struct hlist_node *hnode)
-{
-       struct qmt_pool_info *pool;
-       pool = hlist_entry(hnode, struct qmt_pool_info, qpi_hash);
-       qpi_putref_locked(pool);
-}
-
-static void qpi_hash_exit(struct cfs_hash *hs, struct hlist_node *hnode)
-{
-       CERROR("Should not have any item left!\n");
-}
-
-/* vector of hash operations */
-static struct cfs_hash_ops qpi_hash_ops = {
-       .hs_hash        = qpi_hash_hash,
-       .hs_key         = qpi_hash_key,
-       .hs_keycmp      = qpi_hash_keycmp,
-       .hs_object      = qpi_hash_object,
-       .hs_get         = qpi_hash_get,
-       .hs_put_locked  = qpi_hash_put_locked,
-       .hs_exit        = qpi_hash_exit
-};
-
 /* some procfs helpers */
 static int qpi_state_seq_show(struct seq_file *m, void *data)
 {
@@ -148,23 +79,25 @@ static int qpi_state_seq_show(struct seq_file *m, void *data)
        int                      type;
 
        LASSERT(pool != NULL);
+       if (unlikely(!test_bit(QPI_FLAG_STATE_INITED, &pool->qpi_flags)))
+               return -ENOENT;
 
        seq_printf(m, "pool:\n"
                   "    id: %u\n"
                   "    type: %s\n"
                   "    ref: %d\n"
                   "    least qunit: %lu\n",
-                  pool->qpi_key & 0x0000ffff,
-                  RES_NAME(pool->qpi_key >> 16),
+                  0,
+                  RES_NAME(pool->qpi_rtype),
                   atomic_read(&pool->qpi_ref),
                   pool->qpi_least_qunit);
 
-       for (type = 0; type < MAXQUOTAS; type++)
+       for (type = 0; type < LL_MAXQUOTAS; type++)
                seq_printf(m, "    %s:\n"
                           "        #slv: %d\n"
                           "        #lqe: %d\n",
-                          QTYPE_NAME(type),
-                          pool->qpi_slv_nr[type],
+                          qtype_name(type),
+                          qpi_slv_nr(pool, type),
                    atomic_read(&pool->qpi_site[type]->lqs_hash->hs_count));
 
        return 0;
@@ -175,26 +108,31 @@ static int qpi_soft_least_qunit_seq_show(struct seq_file *m, void *data)
 {
        struct qmt_pool_info    *pool = m->private;
        LASSERT(pool != NULL);
+       if (unlikely(!test_bit(QPI_FLAG_STATE_INITED, &pool->qpi_flags)))
+               return -ENOENT;
 
-       return seq_printf(m, "%lu\n", pool->qpi_soft_least_qunit);
+       seq_printf(m, "%lu\n", pool->qpi_soft_least_qunit);
+       return 0;
 }
 
 static ssize_t
 qpi_soft_least_qunit_seq_write(struct file *file, const char __user *buffer,
                               size_t count, loff_t *off)
 {
-       struct qmt_pool_info    *pool;
-       int     qunit, rc;
-       s64     least_qunit;
+       struct seq_file *m = file->private_data;
+       struct qmt_pool_info *pool = m->private;
+       long long least_qunit, qunit;
+       int rc;
 
-       pool = ((struct seq_file *)file->private_data)->private;
        LASSERT(pool != NULL);
+       if (unlikely(!test_bit(QPI_FLAG_STATE_INITED, &pool->qpi_flags)))
+               return -ENOENT;
 
        /* Not tuneable for inode limit */
-       if (pool->qpi_key >> 16 != LQUOTA_RES_DT)
+       if (pool->qpi_rtype != LQUOTA_RES_DT)
                return -EINVAL;
 
-       rc = lprocfs_str_to_s64(buffer, count, &least_qunit);
+       rc = kstrtoll_from_user(buffer, count, 0, &least_qunit);
        if (rc)
                return rc;
 
@@ -221,19 +159,17 @@ static struct lprocfs_vars lprocfs_quota_qpi_vars[] = {
 };
 
 /*
- * Allocate a new qmt_pool_info structure and add it to the pool hash table
- * of the qmt.
+ * Allocate a new qmt_pool_info structure and add it to qmt_pool_list.
  *
  * \param env       - is the environment passed by the caller
  * \param qmt       - is the quota master target
- * \param pool_id   - is the 16-bit pool identifier of the new pool to add
  * \param pool_type - is the resource type of this pool instance, either
  *                    LQUOTA_RES_MD or LQUOTA_RES_DT.
  *
  * \retval - 0 on success, appropriate error on failure
  */
 static int qmt_pool_alloc(const struct lu_env *env, struct qmt_device *qmt,
-                         int pool_id, int pool_type)
+                         char *pool_name, int pool_type)
 {
        struct qmt_thread_info  *qti = qmt_info(env);
        struct qmt_pool_info    *pool;
@@ -244,9 +180,10 @@ static int qmt_pool_alloc(const struct lu_env *env, struct qmt_device *qmt,
        if (pool == NULL)
                RETURN(-ENOMEM);
        INIT_LIST_HEAD(&pool->qpi_linkage);
+       init_rwsem(&pool->qpi_recalc_sem);
 
-       /* assign key used by hash functions */
-       pool->qpi_key = pool_id + (pool_type << 16);
+       pool->qpi_rtype = pool_type;
+       pool->qpi_flags = 0;
 
        /* initialize refcount to 1, hash table will then grab an additional
         * reference */
@@ -259,8 +196,15 @@ static int qmt_pool_alloc(const struct lu_env *env, struct qmt_device *qmt,
        else
                pool->qpi_soft_least_qunit = pool->qpi_least_qunit;
 
+       /* grab reference on master target that this pool belongs to */
+       lu_device_get(qmt2lu_dev(qmt));
+       lu_ref_add(&qmt2lu_dev(qmt)->ld_reference, "pool", pool);
+       pool->qpi_qmt = qmt;
+
        /* create pool proc directory */
-       sprintf(qti->qti_buf, "%s-0x%x", RES_NAME(pool_type), pool_id);
+       snprintf(qti->qti_buf, LQUOTA_NAME_MAX, "%s-%s",
+                RES_NAME(pool_type), pool_name);
+       strncpy(pool->qpi_name, pool_name, QPI_MAXNAME);
        pool->qpi_proc = lprocfs_register(qti->qti_buf, qmt->qmt_proc,
                                          lprocfs_quota_qpi_vars, pool);
        if (IS_ERR(pool->qpi_proc)) {
@@ -271,22 +215,14 @@ static int qmt_pool_alloc(const struct lu_env *env, struct qmt_device *qmt,
                GOTO(out, rc);
        }
 
-       /* grab reference on master target that this pool belongs to */
-       lu_device_get(qmt2lu_dev(qmt));
-       lu_ref_add(&qmt2lu_dev(qmt)->ld_reference, "pool", pool);
-       pool->qpi_qmt = qmt;
-
-       /* add to qmt hash table */
-       rc = cfs_hash_add_unique(qmt->qmt_pool_hash, &pool->qpi_key,
-                                &pool->qpi_hash);
-       if (rc) {
-               CERROR("%s: failed to add pool %s to qmt hash (%d)\n",
-                      qmt->qmt_svname, qti->qti_buf, rc);
+       rc = qmt_sarr_pool_init(pool);
+       if (rc)
                GOTO(out, rc);
-       }
 
        /* add to qmt pool list */
+       down_write(&qmt->qmt_pool_lock);
        list_add_tail(&pool->qpi_linkage, &qmt->qmt_pool_list);
+       up_write(&qmt->qmt_pool_lock);
        EXIT;
 out:
        if (rc)
@@ -301,11 +237,23 @@ out:
  * \param env  - is the environment passed by the caller
  * \param pool - is the qmt_pool_info structure to free
  */
-static void qmt_pool_free(const struct lu_env *env, struct qmt_pool_info *pool)
+void qmt_pool_free(const struct lu_env *env, struct qmt_pool_info *pool)
 {
+       struct  qmt_device *qmt = pool->qpi_qmt;
        int     qtype;
        ENTRY;
 
+       /* remove from list */
+       down_write(&qmt->qmt_pool_lock);
+       list_del_init(&pool->qpi_linkage);
+       up_write(&qmt->qmt_pool_lock);
+
+       if (atomic_read(&pool->qpi_ref) > 0)
+               RETURN_EXIT;
+
+       qmt_stop_pool_recalc(pool);
+       qmt_sarr_pool_free(pool);
+
        /* release proc entry */
        if (pool->qpi_proc) {
                lprocfs_remove(&pool->qpi_proc);
@@ -314,7 +262,7 @@ static void qmt_pool_free(const struct lu_env *env, struct qmt_pool_info *pool)
 
        /* release per-quota type site used to manage quota entries as well as
         * references to global index files */
-       for (qtype = 0; qtype < MAXQUOTAS; qtype++) {
+       for (qtype = 0; qtype < LL_MAXQUOTAS; qtype++) {
                /* release lqe storing grace time */
                if (pool->qpi_grace_lqe[qtype] != NULL)
                        lqe_putref(pool->qpi_grace_lqe[qtype]);
@@ -326,19 +274,19 @@ static void qmt_pool_free(const struct lu_env *env, struct qmt_pool_info *pool)
                /* release reference to global index */
                if (pool->qpi_glb_obj[qtype] != NULL &&
                    !IS_ERR(pool->qpi_glb_obj[qtype]))
-                       lu_object_put(env, &pool->qpi_glb_obj[qtype]->do_lu);
+                       dt_object_put(env, pool->qpi_glb_obj[qtype]);
        }
 
        /* release reference on pool directory */
        if (pool->qpi_root != NULL && !IS_ERR(pool->qpi_root))
-               lu_object_put(env, &pool->qpi_root->do_lu);
+               dt_object_put(env, pool->qpi_root);
 
        /* release reference on the master target */
        if (pool->qpi_qmt != NULL) {
                struct lu_device *ld = qmt2lu_dev(pool->qpi_qmt);
 
-               lu_device_put(ld);
                lu_ref_del(&ld->ld_reference, "pool", pool);
+               lu_device_put(ld);
                pool->qpi_qmt = NULL;
        }
 
@@ -346,36 +294,158 @@ static void qmt_pool_free(const struct lu_env *env, struct qmt_pool_info *pool)
        OBD_FREE_PTR(pool);
 }
 
+static inline void qti_pools_init(const struct lu_env *env)
+{
+       struct qmt_thread_info  *qti = qmt_info(env);
+
+       qti->qti_pools_cnt = 0;
+       qti->qti_pools_num = QMT_MAX_POOL_NUM;
+}
+
+#define qti_pools(qti) (qti->qti_pools_num > QMT_MAX_POOL_NUM ? \
+                               qti->qti_pools : qti->qti_pools_small)
+#define qti_pools_env(env) \
+       (qmt_info(env)->qti_pools_num > QMT_MAX_POOL_NUM ? \
+               qmt_info(env)->qti_pools : qmt_info(env)->qti_pools_small)
+#define qti_pools_cnt(env)     (qmt_info(env)->qti_pools_cnt)
+
+static inline int qti_pools_add(const struct lu_env *env,
+                               struct qmt_pool_info *qpi)
+{
+       struct qmt_thread_info  *qti = qmt_info(env);
+       struct qmt_pool_info    **pools = qti->qti_pools;
+
+       pools = qti_pools(qti);
+       LASSERTF(qti->qti_pools_num >= QMT_MAX_POOL_NUM,
+                "Forgot init? %p\n", qti);
+
+       if (qti->qti_pools_cnt >= qti->qti_pools_num) {
+               OBD_ALLOC(pools, sizeof(qpi) * qti->qti_pools_num * 2);
+               if (!pools)
+                       return -ENOMEM;
+               memcpy(pools, qti_pools(qti), qti->qti_pools_cnt * sizeof(qpi));
+               /* Don't need to free, if it is the very 1st allocation */
+               if (qti->qti_pools_num > QMT_MAX_POOL_NUM)
+                       OBD_FREE(qti->qti_pools,
+                                qti->qti_pools_num * sizeof(qpi));
+               qti->qti_pools = pools;
+               qti->qti_pools_num *= 2;
+       }
+
+       qpi_getref(qpi);
+       /* Take this to protect pool's lqes against changing by
+        * recalculation thread. This would be unlocked at
+        * qti_pools_fini. */
+       down_read(&qpi->qpi_recalc_sem);
+       if (qmt_pool_global(qpi) && qti_pools_cnt(env) > 0) {
+               pools[qti->qti_pools_cnt++] = pools[0];
+               /* Store global pool always at index 0 */
+               pools[0] = qpi;
+       } else {
+               pools[qti->qti_pools_cnt++] = qpi;
+       }
+
+       CDEBUG(D_QUOTA, "Pool %s is added, pools %p qti_pools %p pool_num %d\n",
+              qpi->qpi_name, pools, qti->qti_pools, qti->qti_pools_cnt);
+
+       return 0;
+}
+
+static inline void qti_pools_fini(const struct lu_env *env)
+{
+       struct qmt_thread_info  *qti = qmt_info(env);
+       struct qmt_pool_info    **pools = qti->qti_pools;
+       int i;
+
+       LASSERT(qti->qti_pools_cnt > 0);
+
+       pools = qti_pools(qti);
+       for (i = 0; i < qti->qti_pools_cnt; i++) {
+               up_read(&pools[i]->qpi_recalc_sem);
+               qpi_putref(env, pools[i]);
+       }
+
+       if (qti->qti_pools_num > QMT_MAX_POOL_NUM)
+               OBD_FREE(qti->qti_pools,
+                        qti->qti_pools_num * sizeof(struct qmt_pool_info *));
+}
+
 /*
- * Look-up a pool in the hash table based on the pool ID and type.
+ * Look-up a pool in a list based on the type.
  *
- * \param env     - is the environment passed by the caller
- * \param qmt     - is the quota master target
- * \param pool_id   - is the 16-bit identifier of the pool to look up
- * \param pool_type - is the type of this pool, either LQUOTA_RES_MD or
+ * \param env  - is the environment passed by the caller
+ * \param qmt  - is the quota master target
+ * \param rtype - is the type of this pool, either LQUOTA_RES_MD or
  *                    LQUOTA_RES_DT.
+ * \param pool_name - is the pool name to search for
+ * \param idx  - OST or MDT index to search for. When it is >= 0, function
+ *             returns array with pointers to all pools that include
+ *             targets with requested index.
+ * \param add  - add to qti_pool_arr if true
  */
-static struct qmt_pool_info *qmt_pool_lookup(const struct lu_env *env,
+struct qmt_pool_info *qmt_pool_lookup(const struct lu_env *env,
                                             struct qmt_device *qmt,
-                                            int pool_id, int pool_type)
+                                            int rtype,
+                                            char *pool_name,
+                                            int idx, bool add)
 {
-       struct qmt_pool_info    *pool;
-       __u32                    key;
+       struct qmt_pool_info    *pos, *pool;
+       int rc = 0;
        ENTRY;
 
-       LASSERT(qmt->qmt_pool_hash != NULL);
-
-       /* look-up pool in hash table */
-       key = pool_id + (pool_type << 16);
-       pool = cfs_hash_lookup(qmt->qmt_pool_hash, (void *)&key);
-       if (pool == NULL) {
-               /* this qmt isn't managing this pool! */
-               CERROR("%s: looking up quota entry for a pool (0x%x/%d) which "
-                      "isn't managed by this quota master target\n",
-                      qmt->qmt_svname, pool_id, pool_type);
+       down_read(&qmt->qmt_pool_lock);
+       if (list_empty(&qmt->qmt_pool_list)) {
+               up_read(&qmt->qmt_pool_lock);
                RETURN(ERR_PTR(-ENOENT));
        }
-       RETURN(pool);
+
+       CDEBUG(D_QUOTA, "type %d name %s index %d\n",
+              rtype, pool_name ?: "<none>", idx);
+       /* Now just find a pool with correct type in a list. Further we need
+        * to go through the list and find a pool that includes requested OST
+        * or MDT. Possibly this would return a list of pools that includes
+        * needed target(OST/MDT). */
+       pool = NULL;
+       if (idx == -1 && !pool_name)
+               pool_name = GLB_POOL_NAME;
+
+       list_for_each_entry(pos, &qmt->qmt_pool_list, qpi_linkage) {
+               if (pos->qpi_rtype != rtype)
+                       continue;
+
+               if (idx >= 0 && !qmt_sarr_check_idx(pos, idx)) {
+                       rc = qti_pools_add(env, pos);
+                       if (rc)
+                               break;
+                       continue;
+               }
+
+               if (pool_name && !strncmp(pool_name, pos->qpi_name,
+                                         LOV_MAXPOOLNAME)) {
+                       pool = pos;
+                       if (add) {
+                               rc = qti_pools_add(env, pos);
+                               if (rc)
+                                       break;
+                       } else {
+                               qpi_getref(pool);
+                       }
+                       break;
+               }
+       }
+       up_read(&qmt->qmt_pool_lock);
+
+       if (rc)
+               GOTO(out_err, rc);
+
+       if (idx >= 0 && qti_pools_cnt(env))
+               pool = qti_pools_env(env)[0];
+
+       RETURN(pool ? : ERR_PTR(-ENOENT));
+out_err:
+       CERROR("%s: cannot add pool %s: err = %d\n",
+               qmt->qmt_svname, pos->qpi_name, rc);
+       return ERR_PTR(rc);
 }
 
 /*
@@ -383,8 +453,7 @@ static struct qmt_pool_info *qmt_pool_lookup(const struct lu_env *env,
  */
 
 /*
- * Destroy all pools which are still in the hash table and free the pool
- * hash table.
+ * Destroy all pools which are still in the pool list.
  *
  * \param env - is the environment passed by the caller
  * \param qmt - is the quota master target
@@ -392,31 +461,18 @@ static struct qmt_pool_info *qmt_pool_lookup(const struct lu_env *env,
  */
 void qmt_pool_fini(const struct lu_env *env, struct qmt_device *qmt)
 {
-       struct qmt_pool_info    *pool;
-       struct list_head        *pos, *n;
+       struct qmt_pool_info *pool, *tmp;
        ENTRY;
 
-       if (qmt->qmt_pool_hash == NULL)
-               RETURN_EXIT;
-
        /* parse list of pool and destroy each element */
-       list_for_each_safe(pos, n, &qmt->qmt_pool_list) {
-               pool = list_entry(pos, struct qmt_pool_info,
-                                 qpi_linkage);
-               /* remove from hash */
-               cfs_hash_del(qmt->qmt_pool_hash, &pool->qpi_key,
-                            &pool->qpi_hash);
-
-               /* remove from list */
-               list_del_init(&pool->qpi_linkage);
-
+       list_for_each_entry_safe(pool, tmp, &qmt->qmt_pool_list, qpi_linkage) {
+               /* stop all recalc threads - it may hold qpi reference */
+               qmt_stop_pool_recalc(pool);
                /* release extra reference taken in qmt_pool_alloc */
                qpi_putref(env, pool);
        }
        LASSERT(list_empty(&qmt->qmt_pool_list));
 
-       cfs_hash_putref(qmt->qmt_pool_hash);
-       qmt->qmt_pool_hash = NULL;
        EXIT;
 }
 
@@ -436,30 +492,14 @@ int qmt_pool_init(const struct lu_env *env, struct qmt_device *qmt)
        int     res, rc = 0;
        ENTRY;
 
-       /* initialize pool hash table */
-       qmt->qmt_pool_hash = cfs_hash_create("POOL_HASH",
-                                            HASH_POOLS_CUR_BITS,
-                                            HASH_POOLS_MAX_BITS,
-                                            HASH_POOLS_BKT_BITS, 0,
-                                            CFS_HASH_MIN_THETA,
-                                            CFS_HASH_MAX_THETA,
-                                            &qpi_hash_ops,
-                                            CFS_HASH_DEFAULT);
-       if (qmt->qmt_pool_hash == NULL) {
-               CERROR("%s: failed to create pool hash table\n",
-                      qmt->qmt_svname);
-               RETURN(-ENOMEM);
-       }
-
-       /* initialize pool list */
        INIT_LIST_HEAD(&qmt->qmt_pool_list);
+       init_rwsem(&qmt->qmt_pool_lock);
 
-       /* Instantiate pool master for the default data and metadata pool (both
-        * have pool ID equals to 0).
+       /* Instantiate pool master for the default data and metadata pool.
         * This code will have to be revisited once we support quota on
         * non-default pools */
        for (res = LQUOTA_FIRST_RES; res < LQUOTA_LAST_RES; res++) {
-               rc = qmt_pool_alloc(env, qmt, 0, res);
+               rc = qmt_pool_alloc(env, qmt, GLB_POOL_NAME, res);
                if (rc)
                        break;
        }
@@ -473,10 +513,22 @@ int qmt_pool_init(const struct lu_env *env, struct qmt_device *qmt)
 static int qmt_slv_cnt(const struct lu_env *env, struct lu_fid *glb_fid,
                       char *slv_name, struct lu_fid *slv_fid, void *arg)
 {
-       int *nr = arg;
-
+       struct obd_uuid uuid;
+       int (*nr)[QMT_STYPE_CNT][LL_MAXQUOTAS] = arg;
+       int stype, qtype;
+       int rc;
+
+       rc = lquota_extract_fid(glb_fid, NULL, &qtype);
+       LASSERT(!rc);
+
+       obd_str2uuid(&uuid, slv_name);
+       stype = qmt_uuid2idx(&uuid, NULL);
+       if (stype < 0)
+               return stype;
        /* one more slave */
-       (*nr)++;
+       (*nr)[stype][qtype]++;
+       CDEBUG(D_QUOTA, "slv_name %s stype %d qtype %d nr %d\n",
+                       slv_name, stype, qtype, (*nr)[stype][qtype]);
 
        return 0;
 }
@@ -488,11 +540,13 @@ static int qmt_slv_cnt(const struct lu_env *env, struct lu_fid *glb_fid,
  * \param qmt - is the quota master target for which we have to initialize the
  *              pool configuration
  * \param qmt_root - is the on-disk directory created for the QMT.
+ * \param name - is the pool name that we need to setup. Setup all pools
+ *              in qmt_pool_list when name is NULL.
  *
  * \retval - 0 on success, appropriate error on failure
  */
 int qmt_pool_prepare(const struct lu_env *env, struct qmt_device *qmt,
-                    struct dt_object *qmt_root)
+                    struct dt_object *qmt_root, char *name)
 {
        struct qmt_thread_info  *qti = qmt_info(env);
        struct lquota_glb_rec   *rec = &qti->qti_glb_rec;
@@ -500,50 +554,53 @@ int qmt_pool_prepare(const struct lu_env *env, struct qmt_device *qmt,
        struct dt_device        *dev = NULL;
        dt_obj_version_t         version;
        struct list_head        *pos;
-       int                      rc = 0, qtype;
+       int                      rc = 0, i, qtype;
        ENTRY;
 
-       LASSERT(qmt->qmt_pool_hash != NULL);
-
-       /* iterate over each pool in the hash and allocate a quota site for each
+       /* iterate over each pool in the list and allocate a quota site for each
         * one. This involves creating a global index file on disk */
        list_for_each(pos, &qmt->qmt_pool_list) {
                struct dt_object        *obj;
-               int                      pool_type, pool_id;
                struct lquota_entry     *lqe;
+               char                    *pool_name;
+               int                      rtype;
 
                pool = list_entry(pos, struct qmt_pool_info,
                                  qpi_linkage);
 
-               pool_id   = pool->qpi_key & 0x0000ffff;
-               pool_type = pool->qpi_key >> 16;
+               pool_name = pool->qpi_name;
+               if (name && strncmp(pool_name, name, LOV_MAXPOOLNAME))
+                       continue;
+               rtype = pool->qpi_rtype;
                if (dev == NULL)
                        dev = pool->qpi_qmt->qmt_child;
 
                /* allocate directory for this pool */
-               sprintf(qti->qti_buf, "%s-0x%x", RES_NAME(pool_type), pool_id);
+               snprintf(qti->qti_buf, LQUOTA_NAME_MAX, "%s-%s",
+                        RES_NAME(rtype), pool_name);
                obj = lquota_disk_dir_find_create(env, qmt->qmt_child, qmt_root,
                                                  qti->qti_buf);
                if (IS_ERR(obj))
                        RETURN(PTR_ERR(obj));
                pool->qpi_root = obj;
 
-               for (qtype = 0; qtype < MAXQUOTAS; qtype++) {
+               for (qtype = 0; qtype < LL_MAXQUOTAS; qtype++) {
                        /* Generating FID of global index in charge of storing
                         * settings for this quota type */
-                       lquota_generate_fid(&qti->qti_fid, pool_id, pool_type,
-                                           qtype);
+                       lquota_generate_fid(&qti->qti_fid, rtype, qtype);
 
                        /* open/create the global index file for this quota
-                        * type */
+                        * type. If name is set, it means we came here from
+                        * qmt_pool_new and can create glb index with a
+                        * local generated FID. */
                        obj = lquota_disk_glb_find_create(env, dev,
                                                          pool->qpi_root,
-                                                         &qti->qti_fid, false);
+                                                         &qti->qti_fid,
+                                                         name ? true : false);
                        if (IS_ERR(obj)) {
                                rc = PTR_ERR(obj);
-                               CERROR("%s: failed to create glb index copy for"
-                                      " %s type (%d)\n", qmt->qmt_svname,
-                                      QTYPE_NAME(qtype), rc);
+                               CERROR("%s: failed to create glb index copy for %s type: rc = %d\n",
+                                      qmt->qmt_svname, qtype_name(qtype), rc);
                                RETURN(rc);
                        }
 
@@ -555,24 +612,20 @@ int qmt_pool_prepare(const struct lu_env *env, struct qmt_device *qmt,
                                rec->qbr_hardlimit = 0;
                                rec->qbr_softlimit = 0;
                                rec->qbr_granted = 0;
-                               rec->qbr_time = pool_type == LQUOTA_RES_MD ?
+                               rec->qbr_time = rtype == LQUOTA_RES_MD ?
                                        MAX_IQ_TIME : MAX_DQ_TIME;
 
                                rc = lquota_disk_write_glb(env, obj, 0, rec);
                                if (rc) {
-                                       CERROR("%s: failed to set default "
-                                              "grace time for %s type (%d)\n",
-                                              qmt->qmt_svname,
-                                              QTYPE_NAME(qtype), rc);
+                                       CERROR("%s: failed to set default grace time for %s type: rc = %d\n",
+                                              qmt->qmt_svname, qtype_name(qtype), rc);
                                        RETURN(rc);
                                }
 
                                rc = lquota_disk_update_ver(env, dev, obj, 1);
                                if (rc) {
-                                       CERROR("%s: failed to set initial "
-                                              "version for %s type (%d)\n",
-                                              qmt->qmt_svname,
-                                              QTYPE_NAME(qtype), rc);
+                                       CERROR("%s: failed to set initial version for %s type: rc = %d\n",
+                                              qmt->qmt_svname, qtype_name(qtype), rc);
                                        RETURN(rc);
                                }
                        }
@@ -583,23 +636,23 @@ int qmt_pool_prepare(const struct lu_env *env, struct qmt_device *qmt,
                                                                  &qmt_lqe_ops);
                        if (IS_ERR(pool->qpi_site[qtype])) {
                                rc = PTR_ERR(pool->qpi_site[qtype]);
-                               CERROR("%s: failed to create site for %s type "
-                                      "(%d)\n", qmt->qmt_svname,
-                                      QTYPE_NAME(qtype), rc);
+                               CERROR("%s: failed to create site for %s type: rc = %d\n",
+                                      qmt->qmt_svname, qtype_name(qtype), rc);
                                RETURN(rc);
                        }
 
                        /* count number of slaves which already connected to
                         * the master in the past */
-                       pool->qpi_slv_nr[qtype] = 0;
+                       for (i = 0; i < QMT_STYPE_CNT; i++)
+                               pool->qpi_slv_nr[i][qtype] = 0;
+
                        rc = lquota_disk_for_each_slv(env, pool->qpi_root,
                                                      &qti->qti_fid,
                                                      qmt_slv_cnt,
-                                                     &pool->qpi_slv_nr[qtype]);
+                                                     &pool->qpi_slv_nr);
                        if (rc) {
-                               CERROR("%s: failed to scan & count slave "
-                                      "indexes for %s type (%d)\n",
-                                      qmt->qmt_svname, QTYPE_NAME(qtype), rc);
+                               CERROR("%s: failed to scan & count slave indexes for %s type: rc = %d\n",
+                                      qmt->qmt_svname, qtype_name(qtype), rc);
                                RETURN(rc);
                        }
 
@@ -616,16 +669,19 @@ int qmt_pool_prepare(const struct lu_env *env, struct qmt_device *qmt,
 #ifdef CONFIG_PROC_FS
                        /* add procfs file to dump the global index, mostly for
                         * debugging purpose */
-                       sprintf(qti->qti_buf, "glb-%s", QTYPE_NAME(qtype));
+                       snprintf(qti->qti_buf, MTI_NAME_MAXLEN,
+                                "glb-%s", qtype_name(qtype));
                        rc = lprocfs_seq_create(pool->qpi_proc, qti->qti_buf,
                                                0444, &lprocfs_quota_seq_fops,
                                                obj);
                        if (rc)
-                               CWARN("%s: Error adding procfs file for global"
-                                     "quota index "DFID", rc:%d\n",
+                               CWARN("%s: Error adding procfs file for global quota index "DFID": rc = %d\n",
                                      qmt->qmt_svname, PFID(&qti->qti_fid), rc);
 #endif
                }
+               set_bit(QPI_FLAG_STATE_INITED, &pool->qpi_flags);
+               if (name)
+                       break;
        }
 
        RETURN(0);
@@ -651,17 +707,22 @@ int qmt_pool_new_conn(const struct lu_env *env, struct qmt_device *qmt,
 {
        struct qmt_pool_info    *pool;
        struct dt_object        *slv_obj;
-       int                      pool_id, pool_type, qtype;
+       int                      pool_type, qtype, stype;
        bool                     created = false;
-       int                      rc = 0;
+       int                      idx, i, rc = 0;
+
+       stype = qmt_uuid2idx(uuid, &idx);
+       if (stype < 0)
+               RETURN(stype);
 
        /* extract pool info from global index FID */
-       rc = lquota_extract_fid(glb_fid, &pool_id, &pool_type, &qtype);
+       rc = lquota_extract_fid(glb_fid, &pool_type, &qtype);
        if (rc)
                RETURN(rc);
 
        /* look-up pool in charge of this global index FID */
-       pool = qmt_pool_lookup(env, qmt, pool_id, pool_type);
+       qti_pools_init(env);
+       pool = qmt_pool_lookup_arr(env, qmt, pool_type, idx);
        if (IS_ERR(pool))
                RETURN(PTR_ERR(pool));
 
@@ -685,11 +746,12 @@ int qmt_pool_new_conn(const struct lu_env *env, struct qmt_device *qmt,
        /* retrieve slave fid & current object version */
        memcpy(slv_fid, lu_object_fid(&slv_obj->do_lu), sizeof(*slv_fid));
        *slv_ver = dt_version_get(env, slv_obj);
-       lu_object_put(env, &slv_obj->do_lu);
+       dt_object_put(env, slv_obj);
        if (created)
-               pool->qpi_slv_nr[qtype]++;
+               for (i = 0; i < qti_pools_cnt(env); i++)
+                       qti_pools_env(env)[i]->qpi_slv_nr[stype][qtype]++;
 out:
-       qpi_putref(env, pool);
+       qti_pools_fini(env);
        RETURN(rc);
 }
 
@@ -699,7 +761,6 @@ out:
  * \param env - is the environment passed by the caller
  * \param qmt - is the quota master target for which we have to initialize the
  *              pool configuration
- * \param pool_id   - is the 16-bit identifier of the pool
  * \param pool_type - is the pool type, either LQUOTA_RES_MD or LQUOTA_RES_DT.
  * \param qtype     - is the quota type, either user or group.
  * \param qid       - is the quota ID to look-up
@@ -709,17 +770,18 @@ out:
  */
 struct lquota_entry *qmt_pool_lqe_lookup(const struct lu_env *env,
                                         struct qmt_device *qmt,
-                                        int pool_id, int pool_type,
-                                        int qtype, union lquota_id *qid)
+                                        int pool_type, int qtype,
+                                        union lquota_id *qid,
+                                        char *pool_name)
 {
        struct qmt_pool_info    *pool;
        struct lquota_entry     *lqe;
        ENTRY;
 
        /* look-up pool responsible for this global index FID */
-       pool = qmt_pool_lookup(env, qmt, pool_id, pool_type);
+       pool = qmt_pool_lookup_name(env, qmt, pool_type, pool_name);
        if (IS_ERR(pool))
-               RETURN((void *)pool);
+               RETURN(ERR_CAST(pool));
 
        if (qid->qid_uid == 0) {
                /* caller wants to access grace time, no need to look up the
@@ -736,3 +798,827 @@ out:
        qpi_putref(env, pool);
        RETURN(lqe);
 }
+
+int qmt_pool_lqes_lookup(const struct lu_env *env,
+                        struct qmt_device *qmt,
+                        int rtype, int stype,
+                        int qtype, union lquota_id *qid,
+                        char *pool_name, int idx)
+{
+       struct qmt_pool_info    *pool;
+       struct lquota_entry     *lqe;
+       int rc, i;
+       ENTRY;
+
+       /* Until MDT pools are not emplemented, all MDTs belong to
+        * global pool, thus lookup lqes only from global pool. */
+       if (rtype == LQUOTA_RES_DT && stype == QMT_STYPE_MDT)
+               idx = -1;
+
+       qti_pools_init(env);
+       rc = 0;
+       /* look-up pool responsible for this global index FID */
+       pool = qmt_pool_lookup_arr(env, qmt, rtype, idx);
+       if (IS_ERR(pool)) {
+               qti_pools_fini(env);
+               RETURN(PTR_ERR(pool));
+       }
+
+       /* now that we have the pool, let's look-up the quota entry in the
+        * right quota site */
+       qti_lqes_init(env);
+       for (i = 0; i < qti_pools_cnt(env); i++) {
+               pool = qti_pools_env(env)[i];
+               lqe = lqe_locate(env, pool->qpi_site[qtype], qid);
+               if (IS_ERR(lqe)) {
+                       qti_lqes_fini(env);
+                       GOTO(out, rc = PTR_ERR(lqe));
+               }
+               qti_lqes_add(env, lqe);
+       }
+       LASSERT(qti_lqes_glbl(env)->lqe_is_global);
+
+out:
+       qti_pools_fini(env);
+       RETURN(rc);
+}
+
+static int lqes_cmp(const void *arg1, const void *arg2)
+{
+       const struct lquota_entry *lqe1, *lqe2;
+
+       lqe1 = *(const struct lquota_entry **)arg1;
+       lqe2 = *(const struct lquota_entry **)arg2;
+       if (lqe1->lqe_qunit > lqe2->lqe_qunit)
+               return 1;
+       if (lqe1->lqe_qunit < lqe2->lqe_qunit)
+               return -1;
+       return 0;
+}
+
+void qmt_lqes_sort(const struct lu_env *env)
+{
+       sort(qti_lqes(env), qti_lqes_cnt(env), sizeof(void *), lqes_cmp, NULL);
+       /* global lqe was moved during sorting */
+       if (!qti_lqes_glbl(env)->lqe_is_global) {
+               int i;
+               for (i = 0; i < qti_lqes_cnt(env); i++) {
+                       if (qti_lqes(env)[i]->lqe_is_global) {
+                               qti_glbl_lqe_idx(env) = i;
+                               break;
+                       }
+               }
+       }
+}
+
+int qmt_pool_lqes_lookup_spec(const struct lu_env *env, struct qmt_device *qmt,
+                             int rtype, int qtype, union lquota_id *qid)
+{
+       struct qmt_pool_info    *pos;
+       struct lquota_entry     *lqe;
+       int rc = 0;
+
+       qti_lqes_init(env);
+       down_read(&qmt->qmt_pool_lock);
+       if (list_empty(&qmt->qmt_pool_list)) {
+               up_read(&qmt->qmt_pool_lock);
+               RETURN(-ENOENT);
+       }
+
+       list_for_each_entry(pos, &qmt->qmt_pool_list, qpi_linkage) {
+               if (pos->qpi_rtype != rtype)
+                       continue;
+               /* Don't take into account pools without slaves */
+               if (!qpi_slv_nr(pos, qtype))
+                       continue;
+               lqe = lqe_find(env, pos->qpi_site[qtype], qid);
+               /* ENOENT is valid case for lqe from non global pool
+                * that hasn't limits, i.e. not enforced. Continue even
+                * in case of error - we can handle already found lqes */
+               if (IS_ERR_OR_NULL(lqe)) {
+                       /* let know that something went wrong */
+                       rc = lqe ? PTR_ERR(lqe) : -ENOENT;
+                       continue;
+               }
+               if (!lqe->lqe_enforced) {
+                       /* no settings for this qid_uid */
+                       lqe_putref(lqe);
+                       continue;
+               }
+               qti_lqes_add(env, lqe);
+               CDEBUG(D_QUOTA, "adding lqe %p from pool %s\n",
+                                lqe, pos->qpi_name);
+       }
+       up_read(&qmt->qmt_pool_lock);
+       RETURN(rc);
+}
+
+/**
+ * Allocate a new pool for the specified device.
+ *
+ * Allocate a new pool_desc structure for the specified \a new_pool
+ * device to create a pool with the given \a poolname.  The new pool
+ * structure is created with a single reference, and is freed when the
+ * reference count drops to zero.
+ *
+ * \param[in] obd      Lustre OBD device on which to add a pool iterator
+ * \param[in] poolname the name of the pool to be created
+ *
+ * \retval             0 in case of success
+ * \retval             negative error code in case of error
+ */
+int qmt_pool_new(struct obd_device *obd, char *poolname)
+{
+       struct qmt_device       *qmt = lu2qmt_dev(obd->obd_lu_dev);
+       struct qmt_pool_info *qpi;
+       struct lu_env env;
+       int rc;
+       ENTRY;
+
+       if (strnlen(poolname, LOV_MAXPOOLNAME + 1) > LOV_MAXPOOLNAME)
+               RETURN(-ENAMETOOLONG);
+
+       rc = lu_env_init(&env, LCT_MD_THREAD);
+       if (rc) {
+               CERROR("%s: can't init env: rc = %d\n", obd->obd_name, rc);
+               RETURN(rc);
+       }
+
+       qpi = qmt_pool_lookup_name(&env, qmt, LQUOTA_RES_DT, poolname);
+       if (!IS_ERR(qpi)) {
+               /* Valid case when several MDTs are mounted
+                * at the same node. */
+               CDEBUG(D_QUOTA, "pool %s already exists\n", poolname);
+               qpi_putref(&env, qpi);
+               GOTO(out_env, rc = -EEXIST);
+       }
+       if (PTR_ERR(qpi) != -ENOENT) {
+               CWARN("%s: pool %s lookup failed: rc = %ld\n",
+                     obd->obd_name, poolname, PTR_ERR(qpi));
+               GOTO(out_env, rc = PTR_ERR(qpi));
+       }
+
+       /* Now allocate and prepare only DATA pool.
+        * Further when MDT pools will be ready we need to add
+        * a cycle here and setup pools of both types. Another
+        * approach is to find out pool of which type should be
+        * created. */
+       rc = qmt_pool_alloc(&env, qmt, poolname, LQUOTA_RES_DT);
+       if (rc) {
+               CERROR("%s: can't alloc pool %s: rc = %d\n",
+                      obd->obd_name, poolname, rc);
+               GOTO(out_env, rc);
+       }
+
+       rc = qmt_pool_prepare(&env, qmt, qmt->qmt_root, poolname);
+       if (rc) {
+               CERROR("%s: can't prepare pool for %s: rc = %d\n",
+                      obd->obd_name, poolname, rc);
+               GOTO(out_err, rc);
+       }
+
+       CDEBUG(D_QUOTA, "Quota pool "LOV_POOLNAMEF" added\n",
+              poolname);
+
+       GOTO(out_env, rc);
+out_err:
+       qpi = qmt_pool_lookup_name(&env, qmt, LQUOTA_RES_DT, poolname);
+       if (!IS_ERR(qpi)) {
+               qpi_putref(&env, qpi);
+               qpi_putref(&env, qpi);
+       }
+out_env:
+       lu_env_fini(&env);
+       return rc;
+}
+
+static int
+qmt_obj_recalc(const struct lu_env *env, struct dt_object *obj,
+              struct lquota_site *site)
+{
+       struct qmt_thread_info *qti = qmt_info(env);
+       union lquota_id *qid = &qti->qti_id;
+       const struct dt_it_ops *iops;
+       struct dt_key *key;
+       struct dt_it *it;
+       __u64 granted;
+       int rc;
+       ENTRY;
+
+       iops = &obj->do_index_ops->dio_it;
+
+       it = iops->init(env, obj, 0);
+       if (IS_ERR(it)) {
+               CWARN("quota: initialize it for "DFID" failed: rc = %ld\n",
+                     PFID(&qti->qti_fid), PTR_ERR(it));
+               RETURN(PTR_ERR(it));
+       }
+
+       rc = iops->load(env, it, 0);
+       if (rc < 0) {
+               CWARN("quota: load first entry for "DFID" failed: rc = %d\n",
+                     PFID(&qti->qti_fid), rc);
+               GOTO(out, rc);
+       } else if (rc == 0) {
+               rc = iops->next(env, it);
+               if (rc != 0)
+                       GOTO(out, rc = (rc < 0) ? rc : 0);
+       }
+
+       do {
+               struct lquota_entry *lqe;
+
+               key = iops->key(env, it);
+               if (IS_ERR(key)) {
+                       CWARN("quota: error key for "DFID": rc = %ld\n",
+                             PFID(&qti->qti_fid), PTR_ERR(key));
+                       GOTO(out, rc = PTR_ERR(key));
+               }
+
+               /* skip the root user/group */
+               if (*((__u64 *)key) == 0)
+                       goto next;
+
+               qid->qid_uid = *((__u64 *)key);
+
+               rc = qmt_slv_read(env, qid, obj, &granted);
+               if (!granted)
+                       goto next;
+
+               lqe = lqe_locate(env, site, qid);
+               if (IS_ERR(lqe))
+                       GOTO(out, rc = PTR_ERR(lqe));
+               lqe_write_lock(lqe);
+               lqe->lqe_recalc_granted += granted;
+               lqe_write_unlock(lqe);
+               lqe_putref(lqe);
+next:
+               rc = iops->next(env, it);
+               if (rc < 0)
+                       CWARN("quota: failed to parse index "DFID
+                             ", ->next error: rc = %d\n",
+                             PFID(&qti->qti_fid), rc);
+       } while (rc == 0 && !kthread_should_stop());
+
+out:
+       iops->put(env, it);
+       iops->fini(env, it);
+       RETURN(rc);
+}
+
+static int qmt_site_recalc_cb(struct cfs_hash *hs, struct cfs_hash_bd *bd,
+                             struct hlist_node *hnode, void *data)
+{
+       struct lquota_entry     *lqe;
+       struct lu_env *env = data;
+
+       lqe = hlist_entry(hnode, struct lquota_entry, lqe_hash);
+       LASSERT(atomic_read(&lqe->lqe_ref) > 0);
+
+       lqe_write_lock(lqe);
+       if (lqe->lqe_granted != lqe->lqe_recalc_granted) {
+               struct qmt_device *qmt = lqe2qpi(lqe)->qpi_qmt;
+               struct thandle *th;
+               bool need_notify = false;
+               int rc;
+
+               LQUOTA_DEBUG(lqe, "lqe_recalc_granted %llu\n",
+                            lqe->lqe_recalc_granted);
+               lqe->lqe_granted = lqe->lqe_recalc_granted;
+               /* Always returns true, if there is no slaves in a pool */
+               need_notify |= qmt_adjust_qunit(env, lqe);
+               need_notify |= qmt_adjust_edquot(lqe, ktime_get_real_seconds());
+               if (need_notify) {
+                       /* Find all lqes with lqe_id to reseed lgd array */
+                       rc = qmt_pool_lqes_lookup_spec(env, qmt, lqe_rtype(lqe),
+                                               lqe_qtype(lqe), &lqe->lqe_id);
+                       if (!rc && qti_lqes_glbl(env)->lqe_glbl_data) {
+                               qmt_seed_glbe(env,
+                                       qti_lqes_glbl(env)->lqe_glbl_data);
+                               qmt_id_lock_notify(qmt, qti_lqes_glbl(env));
+                       }
+                       qti_lqes_fini(env);
+               }
+               th = dt_trans_create(env, qmt->qmt_child);
+               if (IS_ERR(th))
+                       goto out;
+
+               rc = lquota_disk_declare_write(env, th,
+                                              LQE_GLB_OBJ(lqe),
+                                              &lqe->lqe_id);
+               if (rc)
+                       GOTO(out_stop, rc);
+
+               rc = dt_trans_start_local(env, qmt->qmt_child, th);
+               if (rc)
+                       GOTO(out_stop, rc);
+
+               qmt_glb_write(env, th, lqe, 0, NULL);
+out_stop:
+               dt_trans_stop(env, qmt->qmt_child, th);
+       }
+out:
+       lqe->lqe_recalc_granted = 0;
+       lqe_write_unlock(lqe);
+
+       return 0;
+}
+
+#define MDT_DEV_NAME_LEN (LUSTRE_MAXFSNAME + sizeof("-MDT0000"))
+static struct obd_device *qmt_get_mgc(struct qmt_device *qmt)
+{
+       char mdt_name[MDT_DEV_NAME_LEN];
+       struct lustre_mount_info *lmi;
+       struct obd_device *obd;
+       int rc;
+       ENTRY;
+
+       rc = server_name2fsname(qmt->qmt_svname, mdt_name, NULL);
+       if (rc) {
+               CERROR("quota: cannot get server name from %s: rc = %d\n",
+                      qmt->qmt_svname, rc);
+               RETURN(ERR_PTR(rc));
+       }
+
+       strlcat(mdt_name, "-MDT0000", MDT_DEV_NAME_LEN);
+       lmi = server_get_mount(mdt_name);
+       if (lmi == NULL) {
+               rc = -ENOENT;
+               CERROR("%s: cannot get mount info from %s: rc = %d\n",
+                      qmt->qmt_svname, mdt_name, rc);
+               RETURN(ERR_PTR(rc));
+       }
+       obd = s2lsi(lmi->lmi_sb)->lsi_mgc;
+       lustre_put_lsi(lmi->lmi_sb);
+
+       RETURN(obd);
+}
+
+static int qmt_pool_recalc(void *args)
+{
+       struct qmt_pool_info *pool, *glbl_pool;
+       struct rw_semaphore *sem = NULL;
+       struct obd_device *obd;
+       struct lu_env env;
+       int i, rc, qtype, slaves_cnt;
+       ENTRY;
+
+       pool = args;
+
+       rc = lu_env_init(&env, LCT_MD_THREAD);
+       if (rc) {
+               CERROR("%s: cannot init env: rc = %d\n",
+                      pool->qpi_qmt->qmt_svname, rc);
+               GOTO(out, rc);
+       }
+
+       obd = qmt_get_mgc(pool->qpi_qmt);
+       if (IS_ERR(obd))
+               GOTO(out, rc = PTR_ERR(obd));
+       else
+               /* Waiting for the end of processing mgs config.
+                * It is needed to be sure all pools are configured. */
+               while (obd->obd_process_conf)
+                       schedule_timeout_uninterruptible(cfs_time_seconds(1));
+
+       OBD_FAIL_TIMEOUT(OBD_FAIL_QUOTA_RECALC, cfs_fail_val);
+       sem = qmt_sarr_rwsem(pool);
+       LASSERT(sem);
+       down_read(sem);
+       /* Hold this to be sure that OSTs from this pool
+        * can't do acquire/release.
+        *
+        * I guess below write semaphore could be a bottleneck
+        * as qmt_dqacq would be blocked trying to hold
+        * read_lock at qmt_pool_lookup->qti_pools_add.
+        * But on the other hand adding/removing OSTs to the pool is
+        * a rare operation. If finally this would be a problem,
+        * we can consider another approach. For example we can
+        * iterate through the POOL's lqes. Take lqe, hold lqe_write_lock
+        * and go through appropriate OSTs. I don't use this approach now
+        * as newly created pool hasn't lqes entries. So firstly we need
+        * to get this lqes from the global pool index file. This
+        * solution looks more complex, so leave it as it is. */
+       down_write(&pool->qpi_recalc_sem);
+
+       glbl_pool = qmt_pool_lookup_glb(&env, pool->qpi_qmt, pool->qpi_rtype);
+       if (IS_ERR(glbl_pool))
+               GOTO(out, rc = PTR_ERR(glbl_pool));
+
+       slaves_cnt = qmt_sarr_count(pool);
+       CDEBUG(D_QUOTA, "Starting pool recalculation for %d slaves in %s\n",
+              slaves_cnt, pool->qpi_name);
+
+       for (qtype = 0; qtype < LL_MAXQUOTAS; qtype++) {
+               for (i = 0; i < slaves_cnt; i++) {
+                       struct qmt_thread_info  *qti = qmt_info(&env);
+                       struct dt_object *slv_obj;
+                       struct obd_uuid uuid;
+                       int idx;
+
+                       if (kthread_should_stop())
+                               GOTO(out_stop, rc = 0);
+                       idx = qmt_sarr_get_idx(pool, i);
+                       LASSERT(idx >= 0);
+
+                       /* We don't need fsname here - anyway
+                        * lquota_disk_slv_filename ignores it. */
+                       snprintf(uuid.uuid, UUID_MAX, "-OST%04x_UUID", idx);
+                       lquota_generate_fid(&qti->qti_fid, pool->qpi_rtype,
+                                           qtype);
+                       /* look-up index file associated with acquiring slave */
+                       slv_obj = lquota_disk_slv_find(&env,
+                                               glbl_pool->qpi_qmt->qmt_child,
+                                               glbl_pool->qpi_root,
+                                               &qti->qti_fid,
+                                               &uuid);
+                       if (IS_ERR(slv_obj))
+                               GOTO(out_stop, rc = PTR_ERR(slv_obj));
+
+                       CDEBUG(D_QUOTA, "slv_obj is found %p for uuid %s\n",
+                              slv_obj, uuid.uuid);
+                       qmt_obj_recalc(&env, slv_obj, pool->qpi_site[qtype]);
+                       dt_object_put(&env, slv_obj);
+               }
+               /* Now go trough the site hash and compare lqe_granted
+                * with lqe_calc_granted. Write new value if disagree */
+
+               cfs_hash_for_each(pool->qpi_site[qtype]->lqs_hash,
+                                 qmt_site_recalc_cb, &env);
+       }
+       GOTO(out_stop, rc);
+out_stop:
+       qpi_putref(&env, glbl_pool);
+out:
+       if (xchg(&pool->qpi_recalc_task, NULL) == NULL)
+               /*
+                * Someone is waiting for us to stop - be sure not to exit
+                * before kthread_stop() gets a ref on the task.  No event
+                * will happen on 'pool, this is just a convenient way to
+                * wait.
+                */
+               wait_var_event(pool, kthread_should_stop());
+
+       clear_bit(QPI_FLAG_RECALC_OFFSET, &pool->qpi_flags);
+       /* Pool can't be changed, since sem has been down.
+        * Thus until up_read, no one can restart recalc thread.
+        */
+       if (sem) {
+               up_read(sem);
+               up_write(&pool->qpi_recalc_sem);
+       }
+
+       /* qpi_getref has been called in qmt_start_pool_recalc,
+        * however we can't call qpi_putref if lu_env_init failed.
+        */
+       if (env.le_ctx.lc_state == LCS_ENTERED) {
+               qpi_putref(&env, pool);
+               lu_env_fini(&env);
+       }
+
+       return rc;
+}
+
+static int qmt_start_pool_recalc(struct lu_env *env, struct qmt_pool_info *qpi)
+{
+       struct task_struct *task;
+       int rc = 0;
+
+       if (!test_and_set_bit(QPI_FLAG_RECALC_OFFSET, &qpi->qpi_flags)) {
+               LASSERT(!qpi->qpi_recalc_task);
+
+               qpi_getref(qpi);
+               task = kthread_create(qmt_pool_recalc, qpi,
+                                     "qsd_reint_%s", qpi->qpi_name);
+               if (IS_ERR(task)) {
+                       clear_bit(QPI_FLAG_RECALC_OFFSET, &qpi->qpi_flags);
+                       rc = PTR_ERR(task);
+                       qpi_putref(env, qpi);
+               } else {
+                       qpi->qpi_recalc_task = task;
+                       /* Using park/unpark to start the thread ensures that
+                        * the thread function does get calls, so the
+                        * ref on qpi will be dropped
+                        */
+                       kthread_park(task);
+                       kthread_unpark(task);
+               }
+       }
+
+       RETURN(rc);
+}
+
+static inline void qmt_stop_pool_recalc(struct qmt_pool_info *qpi)
+{
+       struct task_struct *task;
+
+       task = xchg(&qpi->qpi_recalc_task, NULL);
+       if (task)
+               kthread_stop(task);
+}
+
+static int qmt_pool_slv_nr_change(const struct lu_env *env,
+                                 struct qmt_pool_info *pool,
+                                 int idx, bool add)
+{
+       struct qmt_pool_info *glbl_pool;
+       int qtype;
+
+       glbl_pool = qmt_pool_lookup_glb(env, pool->qpi_qmt, LQUOTA_RES_DT);
+       if (IS_ERR(glbl_pool))
+               RETURN(PTR_ERR(glbl_pool));
+
+       for (qtype = 0; qtype < LL_MAXQUOTAS; qtype++) {
+               struct qmt_thread_info  *qti = qmt_info(env);
+               struct dt_object *slv_obj;
+               struct obd_uuid uuid;
+
+               /* We don't need fsname here - anyway
+                * lquota_disk_slv_filename ignores it. */
+               snprintf(uuid.uuid, UUID_MAX, "-OST%04x_UUID", idx);
+               lquota_generate_fid(&qti->qti_fid, pool->qpi_rtype,
+                                   qtype);
+               /* look-up index file associated with acquiring slave */
+               slv_obj = lquota_disk_slv_find(env,
+                                       glbl_pool->qpi_qmt->qmt_child,
+                                       glbl_pool->qpi_root,
+                                       &qti->qti_fid,
+                                       &uuid);
+               if (IS_ERR(slv_obj))
+                       continue;
+
+               if (add)
+                       pool->qpi_slv_nr[QMT_STYPE_OST][qtype]++;
+               else
+                       pool->qpi_slv_nr[QMT_STYPE_OST][qtype]--;
+               dt_object_put(env, slv_obj);
+       }
+       qpi_putref(env, glbl_pool);
+
+       return 0;
+}
+
+static int qmt_pool_add_rem(struct obd_device *obd, char *poolname,
+                           char *slavename, bool add)
+{
+       struct qmt_device       *qmt = lu2qmt_dev(obd->obd_lu_dev);
+       struct qmt_pool_info    *qpi;
+       struct lu_env            env;
+       int                      rc, idx;
+       ENTRY;
+
+       if (qmt->qmt_stopping)
+               RETURN(0);
+
+       if (strnlen(poolname, LOV_MAXPOOLNAME + 1) > LOV_MAXPOOLNAME)
+               RETURN(-ENAMETOOLONG);
+
+       CDEBUG(D_QUOTA, add ? "%s: pool %s, adding %s\n" :
+                             "%s: pool %s, removing %s\n",
+             obd->obd_name, poolname, slavename);
+
+       rc = server_name2index(slavename, &idx, NULL);
+       if (rc != LDD_F_SV_TYPE_OST)
+               RETURN(-EINVAL);
+
+       rc = lu_env_init(&env, LCT_MD_THREAD);
+       if (rc) {
+               CERROR("%s: cannot init env: rc = %d\n", obd->obd_name, rc);
+               RETURN(rc);
+       }
+
+       qpi = qmt_pool_lookup_name(&env, qmt, LQUOTA_RES_DT, poolname);
+       if (IS_ERR(qpi)) {
+               CWARN("%s: can't find pool %s: rc = %long\n",
+                     obd->obd_name, poolname, PTR_ERR(qpi));
+               GOTO(out, rc = PTR_ERR(qpi));
+       }
+
+       rc = add ? qmt_sarr_pool_add(qpi, idx, 32) :
+                  qmt_sarr_pool_rem(qpi, idx);
+       if (rc) {
+               CERROR("%s: can't %s %s pool %s: rc = %d\n",
+                      add ? "add to" : "remove", obd->obd_name,
+                      slavename, poolname, rc);
+               GOTO(out_putref, rc);
+       }
+       qmt_pool_slv_nr_change(&env, qpi, idx, add);
+       qmt_start_pool_recalc(&env, qpi);
+
+out_putref:
+       qpi_putref(&env, qpi);
+out:
+       lu_env_fini(&env);
+       RETURN(rc);
+}
+
+
+
+/**
+ * Add a single target device to the named pool.
+ *
+ * \param[in] obd      OBD device on which to add the pool
+ * \param[in] poolname name of the pool to which to add the target \a slavename
+ * \param[in] slavename        name of the target device to be added
+ *
+ * \retval             0 if \a slavename was (previously) added to the pool
+ * \retval             negative error number on failure
+ */
+int qmt_pool_add(struct obd_device *obd, char *poolname, char *slavename)
+{
+       return qmt_pool_add_rem(obd, poolname, slavename, true);
+}
+
+/**
+ * Remove the named target from the specified pool.
+ *
+ * \param[in] obd      OBD device from which to remove \a poolname
+ * \param[in] poolname name of the pool to be changed
+ * \param[in] slavename        name of the target to remove from \a poolname
+ *
+ * \retval             0 on successfully removing \a slavename from the pool
+ * \retval             negative number on error (e.g. \a slavename not in pool)
+ */
+int qmt_pool_rem(struct obd_device *obd, char *poolname, char *slavename)
+{
+       return qmt_pool_add_rem(obd, poolname, slavename, false);
+}
+
+/**
+ * Remove the named pool from the QMT device.
+ *
+ * \param[in] obd      OBD device on which pool was previously created
+ * \param[in] poolname name of pool to remove from \a obd
+ *
+ * \retval             0 on successfully removing the pool
+ * \retval             negative error numbers for failures
+ */
+int qmt_pool_del(struct obd_device *obd, char *poolname)
+{
+       struct qmt_device       *qmt = lu2qmt_dev(obd->obd_lu_dev);
+       struct qmt_pool_info    *qpi;
+       struct lu_fid            fid;
+       char                     buf[LQUOTA_NAME_MAX];
+       struct lu_env            env;
+       int                      rc;
+       int                      qtype;
+       ENTRY;
+
+       if (strnlen(poolname, LOV_MAXPOOLNAME + 1) > LOV_MAXPOOLNAME)
+               RETURN(-ENAMETOOLONG);
+
+       CDEBUG(D_QUOTA, "Removing quota pool "LOV_POOLNAMEF"\n",
+              poolname);
+
+       rc = lu_env_init(&env, LCT_MD_THREAD);
+       if (rc) {
+               CERROR("%s: cannot init env: rc = %d\n", obd->obd_name, rc);
+               RETURN(rc);
+       }
+
+       /* look-up pool in charge of this global index FID */
+       qpi = qmt_pool_lookup_name(&env, qmt, LQUOTA_RES_DT, poolname);
+       if (IS_ERR(qpi)) {
+               /* Valid case for several MDTs at the same node -
+                * pool removed by the 1st MDT in config */
+               CDEBUG(D_QUOTA, "Cannot find pool %s\n", poolname);
+               lu_env_fini(&env);
+               RETURN(PTR_ERR(qpi));
+       }
+
+       for (qtype = 0; qtype < LL_MAXQUOTAS; qtype++) {
+               lquota_generate_fid(&fid, LQUOTA_RES_DT, qtype);
+               snprintf(buf, LQUOTA_NAME_MAX, "0x%x", fid.f_oid);
+               rc = local_object_unlink(&env, qmt->qmt_child,
+                                        qpi->qpi_root, buf);
+               if (rc)
+                       CWARN("%s: cannot unlink %s from pool %s: rc = %d\n",
+                             obd->obd_name, buf, poolname, rc);
+       }
+
+       /* put ref from look-up */
+       qpi_putref(&env, qpi);
+       /* put last ref to free qpi */
+       qpi_putref(&env, qpi);
+
+       snprintf(buf, LQUOTA_NAME_MAX, "%s-%s",
+                RES_NAME(LQUOTA_RES_DT), poolname);
+       rc = local_object_unlink(&env, qmt->qmt_child, qmt->qmt_root, buf);
+       if (rc)
+               CWARN("%s: cannot unlink dir %s: rc = %d\n",
+                     obd->obd_name, poolname, rc);
+
+       lu_env_fini(&env);
+       RETURN(0);
+}
+
+static inline int qmt_sarr_pool_init(struct qmt_pool_info *qpi)
+{
+
+       /* No need to initialize sarray for global pool
+        * as it always includes all slaves */
+       if (qmt_pool_global(qpi))
+               return 0;
+
+       switch (qpi->qpi_rtype) {
+       case LQUOTA_RES_DT:
+               return lu_tgt_pool_init(&qpi->qpi_sarr.osts, 0);
+       case LQUOTA_RES_MD:
+       default:
+               return 0;
+       }
+}
+
+static inline int qmt_sarr_pool_add(struct qmt_pool_info *qpi, int idx, int min)
+{
+       switch (qpi->qpi_rtype) {
+       case LQUOTA_RES_DT:
+               return lu_tgt_pool_add(&qpi->qpi_sarr.osts, idx, min);
+       case LQUOTA_RES_MD:
+       default:
+               return 0;
+       }
+}
+
+static inline int qmt_sarr_pool_rem(struct qmt_pool_info *qpi, int idx)
+{
+       switch (qpi->qpi_rtype) {
+       case LQUOTA_RES_DT:
+               return lu_tgt_pool_remove(&qpi->qpi_sarr.osts, idx);
+       case LQUOTA_RES_MD:
+       default:
+               return 0;
+       }
+}
+
+static inline void qmt_sarr_pool_free(struct qmt_pool_info *qpi)
+{
+       if (qmt_pool_global(qpi))
+               return;
+
+       switch (qpi->qpi_rtype) {
+       case LQUOTA_RES_DT:
+               if (qpi->qpi_sarr.osts.op_array)
+                       lu_tgt_pool_free(&qpi->qpi_sarr.osts);
+               return;
+       case LQUOTA_RES_MD:
+       default:
+               return;
+       }
+}
+
+static inline int qmt_sarr_check_idx(struct qmt_pool_info *qpi, int idx)
+{
+       if (qmt_pool_global(qpi))
+               return 0;
+
+       switch (qpi->qpi_rtype) {
+       case LQUOTA_RES_DT:
+               return lu_tgt_check_index(idx, &qpi->qpi_sarr.osts);
+       case LQUOTA_RES_MD:
+       default:
+               return 0;
+       }
+}
+
+struct rw_semaphore *qmt_sarr_rwsem(struct qmt_pool_info *qpi)
+{
+       switch (qpi->qpi_rtype) {
+       case LQUOTA_RES_DT:
+               /* to protect ost_pool use */
+               return &qpi->qpi_sarr.osts.op_rw_sem;
+       case LQUOTA_RES_MD:
+       default:
+               return NULL;
+       }
+}
+
+int qmt_sarr_get_idx(struct qmt_pool_info *qpi, int arr_idx)
+{
+
+       if (qmt_pool_global(qpi))
+               return arr_idx;
+
+       switch (qpi->qpi_rtype) {
+       case LQUOTA_RES_DT:
+               LASSERTF(arr_idx < qpi->qpi_sarr.osts.op_count && arr_idx >= 0,
+                        "idx invalid %d op_count %d\n", arr_idx,
+                        qpi->qpi_sarr.osts.op_count);
+               return qpi->qpi_sarr.osts.op_array[arr_idx];
+       case LQUOTA_RES_MD:
+       default:
+               return -EINVAL;
+       }
+}
+
+/* Number of slaves in a pool */
+unsigned int qmt_sarr_count(struct qmt_pool_info *qpi)
+{
+       switch (qpi->qpi_rtype) {
+       case LQUOTA_RES_DT:
+               return qpi->qpi_sarr.osts.op_count;
+       case LQUOTA_RES_MD:
+       default:
+               return -EINVAL;
+       }
+}