Whamcloud - gitweb
LU-15097 quota: stop pool_recalc before killing pool
[fs/lustre-release.git] / lustre / quota / qmt_pool.c
index 87ce912..c4398a5 100644 (file)
 #include <lprocfs_status.h>
 #include "qmt_internal.h"
 
-static void qmt_pool_free(const struct lu_env *, struct qmt_pool_info *);
+static inline int qmt_sarr_pool_init(struct qmt_pool_info *qpi);
+static inline int qmt_sarr_pool_add(struct qmt_pool_info *qpi,
+                                   int idx, int min);
+static inline int qmt_sarr_pool_rem(struct qmt_pool_info *qpi, int idx);
+static inline void qmt_sarr_pool_free(struct qmt_pool_info *qpi);
+static inline int qmt_sarr_check_idx(struct qmt_pool_info *qpi, int idx);
+static inline void qmt_stop_pool_recalc(struct qmt_pool_info *qpi);
 
 /*
  * Static helper functions not used outside the scope of this file
  */
 
-/*
- * Reference counter management for qmt_pool_info structures
- */
-static inline void qpi_getref(struct qmt_pool_info *pool)
-{
-       atomic_inc(&pool->qpi_ref);
-}
-
-static inline void qpi_putref(const struct lu_env *env,
-                             struct qmt_pool_info *pool)
-{
-       LASSERT(atomic_read(&pool->qpi_ref) > 0);
-       if (atomic_dec_and_test(&pool->qpi_ref))
-               qmt_pool_free(env, pool);
-}
-
 static inline void qpi_putref_locked(struct qmt_pool_info *pool)
 {
        LASSERT(atomic_read(&pool->qpi_ref) > 1);
@@ -89,6 +79,8 @@ static int qpi_state_seq_show(struct seq_file *m, void *data)
        int                      type;
 
        LASSERT(pool != NULL);
+       if (unlikely(!test_bit(QPI_FLAG_STATE_INITED, &pool->qpi_flags)))
+               return -ENOENT;
 
        seq_printf(m, "pool:\n"
                   "    id: %u\n"
@@ -105,7 +97,7 @@ static int qpi_state_seq_show(struct seq_file *m, void *data)
                           "        #slv: %d\n"
                           "        #lqe: %d\n",
                           qtype_name(type),
-                          pool->qpi_slv_nr[type],
+                          qpi_slv_nr(pool, type),
                    atomic_read(&pool->qpi_site[type]->lqs_hash->hs_count));
 
        return 0;
@@ -116,6 +108,8 @@ static int qpi_soft_least_qunit_seq_show(struct seq_file *m, void *data)
 {
        struct qmt_pool_info    *pool = m->private;
        LASSERT(pool != NULL);
+       if (unlikely(!test_bit(QPI_FLAG_STATE_INITED, &pool->qpi_flags)))
+               return -ENOENT;
 
        seq_printf(m, "%lu\n", pool->qpi_soft_least_qunit);
        return 0;
@@ -125,12 +119,14 @@ static ssize_t
 qpi_soft_least_qunit_seq_write(struct file *file, const char __user *buffer,
                               size_t count, loff_t *off)
 {
-       struct qmt_pool_info *pool;
-       long long least_qunit;
-       int qunit, rc;
+       struct seq_file *m = file->private_data;
+       struct qmt_pool_info *pool = m->private;
+       long long least_qunit, qunit;
+       int rc;
 
-       pool = ((struct seq_file *)file->private_data)->private;
        LASSERT(pool != NULL);
+       if (unlikely(!test_bit(QPI_FLAG_STATE_INITED, &pool->qpi_flags)))
+               return -ENOENT;
 
        /* Not tuneable for inode limit */
        if (pool->qpi_rtype != LQUOTA_RES_DT)
@@ -184,8 +180,10 @@ static int qmt_pool_alloc(const struct lu_env *env, struct qmt_device *qmt,
        if (pool == NULL)
                RETURN(-ENOMEM);
        INIT_LIST_HEAD(&pool->qpi_linkage);
+       init_rwsem(&pool->qpi_recalc_sem);
 
        pool->qpi_rtype = pool_type;
+       pool->qpi_flags = 0;
 
        /* initialize refcount to 1, hash table will then grab an additional
         * reference */
@@ -217,10 +215,14 @@ static int qmt_pool_alloc(const struct lu_env *env, struct qmt_device *qmt,
                GOTO(out, rc);
        }
 
+       rc = qmt_sarr_pool_init(pool);
+       if (rc)
+               GOTO(out, rc);
+
        /* add to qmt pool list */
-       write_lock(&qmt->qmt_pool_lock);
+       down_write(&qmt->qmt_pool_lock);
        list_add_tail(&pool->qpi_linkage, &qmt->qmt_pool_list);
-       write_unlock(&qmt->qmt_pool_lock);
+       up_write(&qmt->qmt_pool_lock);
        EXIT;
 out:
        if (rc)
@@ -235,20 +237,23 @@ out:
  * \param env  - is the environment passed by the caller
  * \param pool - is the qmt_pool_info structure to free
  */
-static void qmt_pool_free(const struct lu_env *env, struct qmt_pool_info *pool)
+void qmt_pool_free(const struct lu_env *env, struct qmt_pool_info *pool)
 {
        struct  qmt_device *qmt = pool->qpi_qmt;
        int     qtype;
        ENTRY;
 
        /* remove from list */
-       write_lock(&qmt->qmt_pool_lock);
+       down_write(&qmt->qmt_pool_lock);
        list_del_init(&pool->qpi_linkage);
-       write_unlock(&qmt->qmt_pool_lock);
+       up_write(&qmt->qmt_pool_lock);
 
        if (atomic_read(&pool->qpi_ref) > 0)
                RETURN_EXIT;
 
+       qmt_stop_pool_recalc(pool);
+       qmt_sarr_pool_free(pool);
+
        /* release proc entry */
        if (pool->qpi_proc) {
                lprocfs_remove(&pool->qpi_proc);
@@ -280,8 +285,8 @@ static void qmt_pool_free(const struct lu_env *env, struct qmt_pool_info *pool)
        if (pool->qpi_qmt != NULL) {
                struct lu_device *ld = qmt2lu_dev(pool->qpi_qmt);
 
-               lu_device_put(ld);
                lu_ref_del(&ld->ld_reference, "pool", pool);
+               lu_device_put(ld);
                pool->qpi_qmt = NULL;
        }
 
@@ -289,42 +294,158 @@ static void qmt_pool_free(const struct lu_env *env, struct qmt_pool_info *pool)
        OBD_FREE_PTR(pool);
 }
 
+static inline void qti_pools_init(const struct lu_env *env)
+{
+       struct qmt_thread_info  *qti = qmt_info(env);
+
+       qti->qti_pools_cnt = 0;
+       qti->qti_pools_num = QMT_MAX_POOL_NUM;
+}
+
+#define qti_pools(qti) (qti->qti_pools_num > QMT_MAX_POOL_NUM ? \
+                               qti->qti_pools : qti->qti_pools_small)
+#define qti_pools_env(env) \
+       (qmt_info(env)->qti_pools_num > QMT_MAX_POOL_NUM ? \
+               qmt_info(env)->qti_pools : qmt_info(env)->qti_pools_small)
+#define qti_pools_cnt(env)     (qmt_info(env)->qti_pools_cnt)
+
+static inline int qti_pools_add(const struct lu_env *env,
+                               struct qmt_pool_info *qpi)
+{
+       struct qmt_thread_info  *qti = qmt_info(env);
+       struct qmt_pool_info    **pools = qti->qti_pools;
+
+       pools = qti_pools(qti);
+       LASSERTF(qti->qti_pools_num >= QMT_MAX_POOL_NUM,
+                "Forgot init? %p\n", qti);
+
+       if (qti->qti_pools_cnt >= qti->qti_pools_num) {
+               OBD_ALLOC(pools, sizeof(qpi) * qti->qti_pools_num * 2);
+               if (!pools)
+                       return -ENOMEM;
+               memcpy(pools, qti_pools(qti), qti->qti_pools_cnt * sizeof(qpi));
+               /* Don't need to free, if it is the very 1st allocation */
+               if (qti->qti_pools_num > QMT_MAX_POOL_NUM)
+                       OBD_FREE(qti->qti_pools,
+                                qti->qti_pools_num * sizeof(qpi));
+               qti->qti_pools = pools;
+               qti->qti_pools_num *= 2;
+       }
+
+       qpi_getref(qpi);
+       /* Take this to protect pool's lqes against changing by
+        * recalculation thread. This would be unlocked at
+        * qti_pools_fini. */
+       down_read(&qpi->qpi_recalc_sem);
+       if (qmt_pool_global(qpi) && qti_pools_cnt(env) > 0) {
+               pools[qti->qti_pools_cnt++] = pools[0];
+               /* Store global pool always at index 0 */
+               pools[0] = qpi;
+       } else {
+               pools[qti->qti_pools_cnt++] = qpi;
+       }
+
+       CDEBUG(D_QUOTA, "Pool %s is added, pools %p qti_pools %p pool_num %d\n",
+              qpi->qpi_name, pools, qti->qti_pools, qti->qti_pools_cnt);
+
+       return 0;
+}
+
+static inline void qti_pools_fini(const struct lu_env *env)
+{
+       struct qmt_thread_info  *qti = qmt_info(env);
+       struct qmt_pool_info    **pools = qti->qti_pools;
+       int i;
+
+       LASSERT(qti->qti_pools_cnt > 0);
+
+       pools = qti_pools(qti);
+       for (i = 0; i < qti->qti_pools_cnt; i++) {
+               up_read(&pools[i]->qpi_recalc_sem);
+               qpi_putref(env, pools[i]);
+       }
+
+       if (qti->qti_pools_num > QMT_MAX_POOL_NUM)
+               OBD_FREE(qti->qti_pools,
+                        qti->qti_pools_num * sizeof(struct qmt_pool_info *));
+}
+
 /*
  * Look-up a pool in a list based on the type.
  *
- * \param env     - is the environment passed by the caller
- * \param qmt     - is the quota master target
- * \param pool_type - is the type of this pool, either LQUOTA_RES_MD or
+ * \param env  - is the environment passed by the caller
+ * \param qmt  - is the quota master target
+ * \param rtype - is the type of this pool, either LQUOTA_RES_MD or
  *                    LQUOTA_RES_DT.
+ * \param pool_name - is the pool name to search for
+ * \param idx  - OST or MDT index to search for. When it is >= 0, function
+ *             returns array with pointers to all pools that include
+ *             targets with requested index.
+ * \param add  - add to qti_pool_arr if true
  */
-static struct qmt_pool_info *qmt_pool_lookup(const struct lu_env *env,
+struct qmt_pool_info *qmt_pool_lookup(const struct lu_env *env,
                                             struct qmt_device *qmt,
-                                            int pool_type)
+                                            int rtype,
+                                            char *pool_name,
+                                            int idx, bool add)
 {
        struct qmt_pool_info    *pos, *pool;
+       int rc = 0;
        ENTRY;
 
-       read_lock(&qmt->qmt_pool_lock);
+       down_read(&qmt->qmt_pool_lock);
        if (list_empty(&qmt->qmt_pool_list)) {
-               read_unlock(&qmt->qmt_pool_lock);
+               up_read(&qmt->qmt_pool_lock);
                RETURN(ERR_PTR(-ENOENT));
        }
 
+       CDEBUG(D_QUOTA, "type %d name %s index %d\n",
+              rtype, pool_name ?: "<none>", idx);
        /* Now just find a pool with correct type in a list. Further we need
         * to go through the list and find a pool that includes requested OST
         * or MDT. Possibly this would return a list of pools that includes
         * needed target(OST/MDT). */
        pool = NULL;
+       if (idx == -1 && !pool_name)
+               pool_name = GLB_POOL_NAME;
+
        list_for_each_entry(pos, &qmt->qmt_pool_list, qpi_linkage) {
-               if (pos->qpi_rtype == pool_type) {
+               if (pos->qpi_rtype != rtype)
+                       continue;
+
+               if (idx >= 0 && !qmt_sarr_check_idx(pos, idx)) {
+                       rc = qti_pools_add(env, pos);
+                       if (rc)
+                               break;
+                       continue;
+               }
+
+               if (pool_name && !strncmp(pool_name, pos->qpi_name,
+                                         LOV_MAXPOOLNAME)) {
                        pool = pos;
-                       qpi_getref(pool);
+                       if (add) {
+                               rc = qti_pools_add(env, pos);
+                               if (rc)
+                                       break;
+                       } else {
+                               qpi_getref(pool);
+                       }
                        break;
                }
        }
-       read_unlock(&qmt->qmt_pool_lock);
+       up_read(&qmt->qmt_pool_lock);
+
+       if (rc)
+               GOTO(out_err, rc);
 
-       RETURN(pool);
+       if (idx >= 0 && qti_pools_cnt(env))
+               pool = qti_pools_env(env)[0];
+
+       RETURN(pool ? : ERR_PTR(-ENOENT));
+out_err:
+       CERROR("%s: cannot add pool %s: err = %d\n",
+               qmt->qmt_svname, pos->qpi_name, rc);
+       return ERR_PTR(rc);
 }
 
 /*
@@ -345,6 +466,8 @@ void qmt_pool_fini(const struct lu_env *env, struct qmt_device *qmt)
 
        /* parse list of pool and destroy each element */
        list_for_each_entry_safe(pool, tmp, &qmt->qmt_pool_list, qpi_linkage) {
+               /* stop all recalc threads - it may hold qpi reference */
+               qmt_stop_pool_recalc(pool);
                /* release extra reference taken in qmt_pool_alloc */
                qpi_putref(env, pool);
        }
@@ -370,13 +493,13 @@ int qmt_pool_init(const struct lu_env *env, struct qmt_device *qmt)
        ENTRY;
 
        INIT_LIST_HEAD(&qmt->qmt_pool_list);
-       rwlock_init(&qmt->qmt_pool_lock);
+       init_rwsem(&qmt->qmt_pool_lock);
 
        /* Instantiate pool master for the default data and metadata pool.
         * This code will have to be revisited once we support quota on
         * non-default pools */
        for (res = LQUOTA_FIRST_RES; res < LQUOTA_LAST_RES; res++) {
-               rc = qmt_pool_alloc(env, qmt, "0x0", res);
+               rc = qmt_pool_alloc(env, qmt, GLB_POOL_NAME, res);
                if (rc)
                        break;
        }
@@ -390,10 +513,22 @@ int qmt_pool_init(const struct lu_env *env, struct qmt_device *qmt)
 static int qmt_slv_cnt(const struct lu_env *env, struct lu_fid *glb_fid,
                       char *slv_name, struct lu_fid *slv_fid, void *arg)
 {
-       int *nr = arg;
-
+       struct obd_uuid uuid;
+       int (*nr)[QMT_STYPE_CNT][LL_MAXQUOTAS] = arg;
+       int stype, qtype;
+       int rc;
+
+       rc = lquota_extract_fid(glb_fid, NULL, &qtype);
+       LASSERT(!rc);
+
+       obd_str2uuid(&uuid, slv_name);
+       stype = qmt_uuid2idx(&uuid, NULL);
+       if (stype < 0)
+               return stype;
        /* one more slave */
-       (*nr)++;
+       (*nr)[stype][qtype]++;
+       CDEBUG(D_QUOTA, "slv_name %s stype %d qtype %d nr %d\n",
+                       slv_name, stype, qtype, (*nr)[stype][qtype]);
 
        return 0;
 }
@@ -405,11 +540,13 @@ static int qmt_slv_cnt(const struct lu_env *env, struct lu_fid *glb_fid,
  * \param qmt - is the quota master target for which we have to initialize the
  *              pool configuration
  * \param qmt_root - is the on-disk directory created for the QMT.
+ * \param name - is the pool name that we need to setup. Setup all pools
+ *              in qmt_pool_list when name is NULL.
  *
  * \retval - 0 on success, appropriate error on failure
  */
 int qmt_pool_prepare(const struct lu_env *env, struct qmt_device *qmt,
-                    struct dt_object *qmt_root)
+                    struct dt_object *qmt_root, char *name)
 {
        struct qmt_thread_info  *qti = qmt_info(env);
        struct lquota_glb_rec   *rec = &qti->qti_glb_rec;
@@ -417,7 +554,7 @@ int qmt_pool_prepare(const struct lu_env *env, struct qmt_device *qmt,
        struct dt_device        *dev = NULL;
        dt_obj_version_t         version;
        struct list_head        *pos;
-       int                      rc = 0, qtype;
+       int                      rc = 0, i, qtype;
        ENTRY;
 
        /* iterate over each pool in the list and allocate a quota site for each
@@ -426,19 +563,21 @@ int qmt_pool_prepare(const struct lu_env *env, struct qmt_device *qmt,
                struct dt_object        *obj;
                struct lquota_entry     *lqe;
                char                    *pool_name;
-               int                      pool_type;
+               int                      rtype;
 
                pool = list_entry(pos, struct qmt_pool_info,
                                  qpi_linkage);
 
                pool_name = pool->qpi_name;
-               pool_type = pool->qpi_rtype;
+               if (name && strncmp(pool_name, name, LOV_MAXPOOLNAME))
+                       continue;
+               rtype = pool->qpi_rtype;
                if (dev == NULL)
                        dev = pool->qpi_qmt->qmt_child;
 
                /* allocate directory for this pool */
                snprintf(qti->qti_buf, LQUOTA_NAME_MAX, "%s-%s",
-                        RES_NAME(pool_type), pool_name);
+                        RES_NAME(rtype), pool_name);
                obj = lquota_disk_dir_find_create(env, qmt->qmt_child, qmt_root,
                                                  qti->qti_buf);
                if (IS_ERR(obj))
@@ -448,13 +587,16 @@ int qmt_pool_prepare(const struct lu_env *env, struct qmt_device *qmt,
                for (qtype = 0; qtype < LL_MAXQUOTAS; qtype++) {
                        /* Generating FID of global index in charge of storing
                         * settings for this quota type */
-                       lquota_generate_fid(&qti->qti_fid, pool_type, qtype);
+                       lquota_generate_fid(&qti->qti_fid, rtype, qtype);
 
                        /* open/create the global index file for this quota
-                        * type */
+                        * type. If name is set, it means we came here from
+                        * qmt_pool_new and can create glb index with a
+                        * local generated FID. */
                        obj = lquota_disk_glb_find_create(env, dev,
                                                          pool->qpi_root,
-                                                         &qti->qti_fid, false);
+                                                         &qti->qti_fid,
+                                                         name ? true : false);
                        if (IS_ERR(obj)) {
                                rc = PTR_ERR(obj);
                                CERROR("%s: failed to create glb index copy for %s type: rc = %d\n",
@@ -470,7 +612,7 @@ int qmt_pool_prepare(const struct lu_env *env, struct qmt_device *qmt,
                                rec->qbr_hardlimit = 0;
                                rec->qbr_softlimit = 0;
                                rec->qbr_granted = 0;
-                               rec->qbr_time = pool_type == LQUOTA_RES_MD ?
+                               rec->qbr_time = rtype == LQUOTA_RES_MD ?
                                        MAX_IQ_TIME : MAX_DQ_TIME;
 
                                rc = lquota_disk_write_glb(env, obj, 0, rec);
@@ -501,11 +643,13 @@ int qmt_pool_prepare(const struct lu_env *env, struct qmt_device *qmt,
 
                        /* count number of slaves which already connected to
                         * the master in the past */
-                       pool->qpi_slv_nr[qtype] = 0;
+                       for (i = 0; i < QMT_STYPE_CNT; i++)
+                               pool->qpi_slv_nr[i][qtype] = 0;
+
                        rc = lquota_disk_for_each_slv(env, pool->qpi_root,
                                                      &qti->qti_fid,
                                                      qmt_slv_cnt,
-                                                     &pool->qpi_slv_nr[qtype]);
+                                                     &pool->qpi_slv_nr);
                        if (rc) {
                                CERROR("%s: failed to scan & count slave indexes for %s type: rc = %d\n",
                                       qmt->qmt_svname, qtype_name(qtype), rc);
@@ -531,11 +675,13 @@ int qmt_pool_prepare(const struct lu_env *env, struct qmt_device *qmt,
                                                0444, &lprocfs_quota_seq_fops,
                                                obj);
                        if (rc)
-                               CWARN("%s: Error adding procfs file for global"
-                                     "quota index "DFID", rc:%d\n",
+                               CWARN("%s: Error adding procfs file for global quota index "DFID": rc = %d\n",
                                      qmt->qmt_svname, PFID(&qti->qti_fid), rc);
 #endif
                }
+               set_bit(QPI_FLAG_STATE_INITED, &pool->qpi_flags);
+               if (name)
+                       break;
        }
 
        RETURN(0);
@@ -561,9 +707,13 @@ int qmt_pool_new_conn(const struct lu_env *env, struct qmt_device *qmt,
 {
        struct qmt_pool_info    *pool;
        struct dt_object        *slv_obj;
-       int                      pool_type, qtype;
+       int                      pool_type, qtype, stype;
        bool                     created = false;
-       int                      rc = 0;
+       int                      idx, i, rc = 0;
+
+       stype = qmt_uuid2idx(uuid, &idx);
+       if (stype < 0)
+               RETURN(stype);
 
        /* extract pool info from global index FID */
        rc = lquota_extract_fid(glb_fid, &pool_type, &qtype);
@@ -571,7 +721,8 @@ int qmt_pool_new_conn(const struct lu_env *env, struct qmt_device *qmt,
                RETURN(rc);
 
        /* look-up pool in charge of this global index FID */
-       pool = qmt_pool_lookup(env, qmt, pool_type);
+       qti_pools_init(env);
+       pool = qmt_pool_lookup_arr(env, qmt, pool_type, idx);
        if (IS_ERR(pool))
                RETURN(PTR_ERR(pool));
 
@@ -597,9 +748,10 @@ int qmt_pool_new_conn(const struct lu_env *env, struct qmt_device *qmt,
        *slv_ver = dt_version_get(env, slv_obj);
        dt_object_put(env, slv_obj);
        if (created)
-               pool->qpi_slv_nr[qtype]++;
+               for (i = 0; i < qti_pools_cnt(env); i++)
+                       qti_pools_env(env)[i]->qpi_slv_nr[stype][qtype]++;
 out:
-       qpi_putref(env, pool);
+       qti_pools_fini(env);
        RETURN(rc);
 }
 
@@ -619,16 +771,17 @@ out:
 struct lquota_entry *qmt_pool_lqe_lookup(const struct lu_env *env,
                                         struct qmt_device *qmt,
                                         int pool_type, int qtype,
-                                        union lquota_id *qid)
+                                        union lquota_id *qid,
+                                        char *pool_name)
 {
        struct qmt_pool_info    *pool;
        struct lquota_entry     *lqe;
        ENTRY;
 
        /* look-up pool responsible for this global index FID */
-       pool = qmt_pool_lookup(env, qmt, pool_type);
+       pool = qmt_pool_lookup_name(env, qmt, pool_type, pool_name);
        if (IS_ERR(pool))
-               RETURN((void *)pool);
+               RETURN(ERR_CAST(pool));
 
        if (qid->qid_uid == 0) {
                /* caller wants to access grace time, no need to look up the
@@ -645,3 +798,827 @@ out:
        qpi_putref(env, pool);
        RETURN(lqe);
 }
+
+int qmt_pool_lqes_lookup(const struct lu_env *env,
+                        struct qmt_device *qmt,
+                        int rtype, int stype,
+                        int qtype, union lquota_id *qid,
+                        char *pool_name, int idx)
+{
+       struct qmt_pool_info    *pool;
+       struct lquota_entry     *lqe;
+       int rc, i;
+       ENTRY;
+
+       /* Until MDT pools are not emplemented, all MDTs belong to
+        * global pool, thus lookup lqes only from global pool. */
+       if (rtype == LQUOTA_RES_DT && stype == QMT_STYPE_MDT)
+               idx = -1;
+
+       qti_pools_init(env);
+       rc = 0;
+       /* look-up pool responsible for this global index FID */
+       pool = qmt_pool_lookup_arr(env, qmt, rtype, idx);
+       if (IS_ERR(pool)) {
+               qti_pools_fini(env);
+               RETURN(PTR_ERR(pool));
+       }
+
+       /* now that we have the pool, let's look-up the quota entry in the
+        * right quota site */
+       qti_lqes_init(env);
+       for (i = 0; i < qti_pools_cnt(env); i++) {
+               pool = qti_pools_env(env)[i];
+               lqe = lqe_locate(env, pool->qpi_site[qtype], qid);
+               if (IS_ERR(lqe)) {
+                       qti_lqes_fini(env);
+                       GOTO(out, rc = PTR_ERR(lqe));
+               }
+               qti_lqes_add(env, lqe);
+       }
+       LASSERT(qti_lqes_glbl(env)->lqe_is_global);
+
+out:
+       qti_pools_fini(env);
+       RETURN(rc);
+}
+
+static int lqes_cmp(const void *arg1, const void *arg2)
+{
+       const struct lquota_entry *lqe1, *lqe2;
+
+       lqe1 = *(const struct lquota_entry **)arg1;
+       lqe2 = *(const struct lquota_entry **)arg2;
+       if (lqe1->lqe_qunit > lqe2->lqe_qunit)
+               return 1;
+       if (lqe1->lqe_qunit < lqe2->lqe_qunit)
+               return -1;
+       return 0;
+}
+
+void qmt_lqes_sort(const struct lu_env *env)
+{
+       sort(qti_lqes(env), qti_lqes_cnt(env), sizeof(void *), lqes_cmp, NULL);
+       /* global lqe was moved during sorting */
+       if (!qti_lqes_glbl(env)->lqe_is_global) {
+               int i;
+               for (i = 0; i < qti_lqes_cnt(env); i++) {
+                       if (qti_lqes(env)[i]->lqe_is_global) {
+                               qti_glbl_lqe_idx(env) = i;
+                               break;
+                       }
+               }
+       }
+}
+
+int qmt_pool_lqes_lookup_spec(const struct lu_env *env, struct qmt_device *qmt,
+                             int rtype, int qtype, union lquota_id *qid)
+{
+       struct qmt_pool_info    *pos;
+       struct lquota_entry     *lqe;
+       int rc = 0;
+
+       qti_lqes_init(env);
+       down_read(&qmt->qmt_pool_lock);
+       if (list_empty(&qmt->qmt_pool_list)) {
+               up_read(&qmt->qmt_pool_lock);
+               RETURN(-ENOENT);
+       }
+
+       list_for_each_entry(pos, &qmt->qmt_pool_list, qpi_linkage) {
+               if (pos->qpi_rtype != rtype)
+                       continue;
+               /* Don't take into account pools without slaves */
+               if (!qpi_slv_nr(pos, qtype))
+                       continue;
+               lqe = lqe_find(env, pos->qpi_site[qtype], qid);
+               /* ENOENT is valid case for lqe from non global pool
+                * that hasn't limits, i.e. not enforced. Continue even
+                * in case of error - we can handle already found lqes */
+               if (IS_ERR_OR_NULL(lqe)) {
+                       /* let know that something went wrong */
+                       rc = lqe ? PTR_ERR(lqe) : -ENOENT;
+                       continue;
+               }
+               if (!lqe->lqe_enforced) {
+                       /* no settings for this qid_uid */
+                       lqe_putref(lqe);
+                       continue;
+               }
+               qti_lqes_add(env, lqe);
+               CDEBUG(D_QUOTA, "adding lqe %p from pool %s\n",
+                                lqe, pos->qpi_name);
+       }
+       up_read(&qmt->qmt_pool_lock);
+       RETURN(rc);
+}
+
+/**
+ * Allocate a new pool for the specified device.
+ *
+ * Allocate a new pool_desc structure for the specified \a new_pool
+ * device to create a pool with the given \a poolname.  The new pool
+ * structure is created with a single reference, and is freed when the
+ * reference count drops to zero.
+ *
+ * \param[in] obd      Lustre OBD device on which to add a pool iterator
+ * \param[in] poolname the name of the pool to be created
+ *
+ * \retval             0 in case of success
+ * \retval             negative error code in case of error
+ */
+int qmt_pool_new(struct obd_device *obd, char *poolname)
+{
+       struct qmt_device       *qmt = lu2qmt_dev(obd->obd_lu_dev);
+       struct qmt_pool_info *qpi;
+       struct lu_env env;
+       int rc;
+       ENTRY;
+
+       if (strnlen(poolname, LOV_MAXPOOLNAME + 1) > LOV_MAXPOOLNAME)
+               RETURN(-ENAMETOOLONG);
+
+       rc = lu_env_init(&env, LCT_MD_THREAD);
+       if (rc) {
+               CERROR("%s: can't init env: rc = %d\n", obd->obd_name, rc);
+               RETURN(rc);
+       }
+
+       qpi = qmt_pool_lookup_name(&env, qmt, LQUOTA_RES_DT, poolname);
+       if (!IS_ERR(qpi)) {
+               /* Valid case when several MDTs are mounted
+                * at the same node. */
+               CDEBUG(D_QUOTA, "pool %s already exists\n", poolname);
+               qpi_putref(&env, qpi);
+               GOTO(out_env, rc = -EEXIST);
+       }
+       if (PTR_ERR(qpi) != -ENOENT) {
+               CWARN("%s: pool %s lookup failed: rc = %ld\n",
+                     obd->obd_name, poolname, PTR_ERR(qpi));
+               GOTO(out_env, rc = PTR_ERR(qpi));
+       }
+
+       /* Now allocate and prepare only DATA pool.
+        * Further when MDT pools will be ready we need to add
+        * a cycle here and setup pools of both types. Another
+        * approach is to find out pool of which type should be
+        * created. */
+       rc = qmt_pool_alloc(&env, qmt, poolname, LQUOTA_RES_DT);
+       if (rc) {
+               CERROR("%s: can't alloc pool %s: rc = %d\n",
+                      obd->obd_name, poolname, rc);
+               GOTO(out_env, rc);
+       }
+
+       rc = qmt_pool_prepare(&env, qmt, qmt->qmt_root, poolname);
+       if (rc) {
+               CERROR("%s: can't prepare pool for %s: rc = %d\n",
+                      obd->obd_name, poolname, rc);
+               GOTO(out_err, rc);
+       }
+
+       CDEBUG(D_QUOTA, "Quota pool "LOV_POOLNAMEF" added\n",
+              poolname);
+
+       GOTO(out_env, rc);
+out_err:
+       qpi = qmt_pool_lookup_name(&env, qmt, LQUOTA_RES_DT, poolname);
+       if (!IS_ERR(qpi)) {
+               qpi_putref(&env, qpi);
+               qpi_putref(&env, qpi);
+       }
+out_env:
+       lu_env_fini(&env);
+       return rc;
+}
+
+static int
+qmt_obj_recalc(const struct lu_env *env, struct dt_object *obj,
+              struct lquota_site *site)
+{
+       struct qmt_thread_info *qti = qmt_info(env);
+       union lquota_id *qid = &qti->qti_id;
+       const struct dt_it_ops *iops;
+       struct dt_key *key;
+       struct dt_it *it;
+       __u64 granted;
+       int rc;
+       ENTRY;
+
+       iops = &obj->do_index_ops->dio_it;
+
+       it = iops->init(env, obj, 0);
+       if (IS_ERR(it)) {
+               CWARN("quota: initialize it for "DFID" failed: rc = %ld\n",
+                     PFID(&qti->qti_fid), PTR_ERR(it));
+               RETURN(PTR_ERR(it));
+       }
+
+       rc = iops->load(env, it, 0);
+       if (rc < 0) {
+               CWARN("quota: load first entry for "DFID" failed: rc = %d\n",
+                     PFID(&qti->qti_fid), rc);
+               GOTO(out, rc);
+       } else if (rc == 0) {
+               rc = iops->next(env, it);
+               if (rc != 0)
+                       GOTO(out, rc = (rc < 0) ? rc : 0);
+       }
+
+       do {
+               struct lquota_entry *lqe;
+
+               key = iops->key(env, it);
+               if (IS_ERR(key)) {
+                       CWARN("quota: error key for "DFID": rc = %ld\n",
+                             PFID(&qti->qti_fid), PTR_ERR(key));
+                       GOTO(out, rc = PTR_ERR(key));
+               }
+
+               /* skip the root user/group */
+               if (*((__u64 *)key) == 0)
+                       goto next;
+
+               qid->qid_uid = *((__u64 *)key);
+
+               rc = qmt_slv_read(env, qid, obj, &granted);
+               if (!granted)
+                       goto next;
+
+               lqe = lqe_locate(env, site, qid);
+               if (IS_ERR(lqe))
+                       GOTO(out, rc = PTR_ERR(lqe));
+               lqe_write_lock(lqe);
+               lqe->lqe_recalc_granted += granted;
+               lqe_write_unlock(lqe);
+               lqe_putref(lqe);
+next:
+               rc = iops->next(env, it);
+               if (rc < 0)
+                       CWARN("quota: failed to parse index "DFID
+                             ", ->next error: rc = %d\n",
+                             PFID(&qti->qti_fid), rc);
+       } while (rc == 0 && !kthread_should_stop());
+
+out:
+       iops->put(env, it);
+       iops->fini(env, it);
+       RETURN(rc);
+}
+
+static int qmt_site_recalc_cb(struct cfs_hash *hs, struct cfs_hash_bd *bd,
+                             struct hlist_node *hnode, void *data)
+{
+       struct lquota_entry     *lqe;
+       struct lu_env *env = data;
+
+       lqe = hlist_entry(hnode, struct lquota_entry, lqe_hash);
+       LASSERT(atomic_read(&lqe->lqe_ref) > 0);
+
+       lqe_write_lock(lqe);
+       if (lqe->lqe_granted != lqe->lqe_recalc_granted) {
+               struct qmt_device *qmt = lqe2qpi(lqe)->qpi_qmt;
+               struct thandle *th;
+               bool need_notify = false;
+               int rc;
+
+               LQUOTA_DEBUG(lqe, "lqe_recalc_granted %llu\n",
+                            lqe->lqe_recalc_granted);
+               lqe->lqe_granted = lqe->lqe_recalc_granted;
+               /* Always returns true, if there is no slaves in a pool */
+               need_notify |= qmt_adjust_qunit(env, lqe);
+               need_notify |= qmt_adjust_edquot(lqe, ktime_get_real_seconds());
+               if (need_notify) {
+                       /* Find all lqes with lqe_id to reseed lgd array */
+                       rc = qmt_pool_lqes_lookup_spec(env, qmt, lqe_rtype(lqe),
+                                               lqe_qtype(lqe), &lqe->lqe_id);
+                       if (!rc && qti_lqes_glbl(env)->lqe_glbl_data) {
+                               qmt_seed_glbe(env,
+                                       qti_lqes_glbl(env)->lqe_glbl_data);
+                               qmt_id_lock_notify(qmt, qti_lqes_glbl(env));
+                       }
+                       qti_lqes_fini(env);
+               }
+               th = dt_trans_create(env, qmt->qmt_child);
+               if (IS_ERR(th))
+                       goto out;
+
+               rc = lquota_disk_declare_write(env, th,
+                                              LQE_GLB_OBJ(lqe),
+                                              &lqe->lqe_id);
+               if (rc)
+                       GOTO(out_stop, rc);
+
+               rc = dt_trans_start_local(env, qmt->qmt_child, th);
+               if (rc)
+                       GOTO(out_stop, rc);
+
+               qmt_glb_write(env, th, lqe, 0, NULL);
+out_stop:
+               dt_trans_stop(env, qmt->qmt_child, th);
+       }
+out:
+       lqe->lqe_recalc_granted = 0;
+       lqe_write_unlock(lqe);
+
+       return 0;
+}
+
+#define MDT_DEV_NAME_LEN (LUSTRE_MAXFSNAME + sizeof("-MDT0000"))
+static struct obd_device *qmt_get_mgc(struct qmt_device *qmt)
+{
+       char mdt_name[MDT_DEV_NAME_LEN];
+       struct lustre_mount_info *lmi;
+       struct obd_device *obd;
+       int rc;
+       ENTRY;
+
+       rc = server_name2fsname(qmt->qmt_svname, mdt_name, NULL);
+       if (rc) {
+               CERROR("quota: cannot get server name from %s: rc = %d\n",
+                      qmt->qmt_svname, rc);
+               RETURN(ERR_PTR(rc));
+       }
+
+       strlcat(mdt_name, "-MDT0000", MDT_DEV_NAME_LEN);
+       lmi = server_get_mount(mdt_name);
+       if (lmi == NULL) {
+               rc = -ENOENT;
+               CERROR("%s: cannot get mount info from %s: rc = %d\n",
+                      qmt->qmt_svname, mdt_name, rc);
+               RETURN(ERR_PTR(rc));
+       }
+       obd = s2lsi(lmi->lmi_sb)->lsi_mgc;
+       lustre_put_lsi(lmi->lmi_sb);
+
+       RETURN(obd);
+}
+
+static int qmt_pool_recalc(void *args)
+{
+       struct qmt_pool_info *pool, *glbl_pool;
+       struct rw_semaphore *sem = NULL;
+       struct obd_device *obd;
+       struct lu_env env;
+       int i, rc, qtype, slaves_cnt;
+       ENTRY;
+
+       pool = args;
+
+       rc = lu_env_init(&env, LCT_MD_THREAD);
+       if (rc) {
+               CERROR("%s: cannot init env: rc = %d\n",
+                      pool->qpi_qmt->qmt_svname, rc);
+               GOTO(out, rc);
+       }
+
+       obd = qmt_get_mgc(pool->qpi_qmt);
+       if (IS_ERR(obd))
+               GOTO(out, rc = PTR_ERR(obd));
+       else
+               /* Waiting for the end of processing mgs config.
+                * It is needed to be sure all pools are configured. */
+               while (obd->obd_process_conf)
+                       schedule_timeout_uninterruptible(cfs_time_seconds(1));
+
+       OBD_FAIL_TIMEOUT(OBD_FAIL_QUOTA_RECALC, cfs_fail_val);
+       sem = qmt_sarr_rwsem(pool);
+       LASSERT(sem);
+       down_read(sem);
+       /* Hold this to be sure that OSTs from this pool
+        * can't do acquire/release.
+        *
+        * I guess below write semaphore could be a bottleneck
+        * as qmt_dqacq would be blocked trying to hold
+        * read_lock at qmt_pool_lookup->qti_pools_add.
+        * But on the other hand adding/removing OSTs to the pool is
+        * a rare operation. If finally this would be a problem,
+        * we can consider another approach. For example we can
+        * iterate through the POOL's lqes. Take lqe, hold lqe_write_lock
+        * and go through appropriate OSTs. I don't use this approach now
+        * as newly created pool hasn't lqes entries. So firstly we need
+        * to get this lqes from the global pool index file. This
+        * solution looks more complex, so leave it as it is. */
+       down_write(&pool->qpi_recalc_sem);
+
+       glbl_pool = qmt_pool_lookup_glb(&env, pool->qpi_qmt, pool->qpi_rtype);
+       if (IS_ERR(glbl_pool))
+               GOTO(out, rc = PTR_ERR(glbl_pool));
+
+       slaves_cnt = qmt_sarr_count(pool);
+       CDEBUG(D_QUOTA, "Starting pool recalculation for %d slaves in %s\n",
+              slaves_cnt, pool->qpi_name);
+
+       for (qtype = 0; qtype < LL_MAXQUOTAS; qtype++) {
+               for (i = 0; i < slaves_cnt; i++) {
+                       struct qmt_thread_info  *qti = qmt_info(&env);
+                       struct dt_object *slv_obj;
+                       struct obd_uuid uuid;
+                       int idx;
+
+                       if (kthread_should_stop())
+                               GOTO(out_stop, rc = 0);
+                       idx = qmt_sarr_get_idx(pool, i);
+                       LASSERT(idx >= 0);
+
+                       /* We don't need fsname here - anyway
+                        * lquota_disk_slv_filename ignores it. */
+                       snprintf(uuid.uuid, UUID_MAX, "-OST%04x_UUID", idx);
+                       lquota_generate_fid(&qti->qti_fid, pool->qpi_rtype,
+                                           qtype);
+                       /* look-up index file associated with acquiring slave */
+                       slv_obj = lquota_disk_slv_find(&env,
+                                               glbl_pool->qpi_qmt->qmt_child,
+                                               glbl_pool->qpi_root,
+                                               &qti->qti_fid,
+                                               &uuid);
+                       if (IS_ERR(slv_obj))
+                               GOTO(out_stop, rc = PTR_ERR(slv_obj));
+
+                       CDEBUG(D_QUOTA, "slv_obj is found %p for uuid %s\n",
+                              slv_obj, uuid.uuid);
+                       qmt_obj_recalc(&env, slv_obj, pool->qpi_site[qtype]);
+                       dt_object_put(&env, slv_obj);
+               }
+               /* Now go trough the site hash and compare lqe_granted
+                * with lqe_calc_granted. Write new value if disagree */
+
+               cfs_hash_for_each(pool->qpi_site[qtype]->lqs_hash,
+                                 qmt_site_recalc_cb, &env);
+       }
+       GOTO(out_stop, rc);
+out_stop:
+       qpi_putref(&env, glbl_pool);
+out:
+       if (xchg(&pool->qpi_recalc_task, NULL) == NULL)
+               /*
+                * Someone is waiting for us to stop - be sure not to exit
+                * before kthread_stop() gets a ref on the task.  No event
+                * will happen on 'pool, this is just a convenient way to
+                * wait.
+                */
+               wait_var_event(pool, kthread_should_stop());
+
+       clear_bit(QPI_FLAG_RECALC_OFFSET, &pool->qpi_flags);
+       /* Pool can't be changed, since sem has been down.
+        * Thus until up_read, no one can restart recalc thread.
+        */
+       if (sem) {
+               up_read(sem);
+               up_write(&pool->qpi_recalc_sem);
+       }
+
+       /* qpi_getref has been called in qmt_start_pool_recalc,
+        * however we can't call qpi_putref if lu_env_init failed.
+        */
+       if (env.le_ctx.lc_state == LCS_ENTERED) {
+               qpi_putref(&env, pool);
+               lu_env_fini(&env);
+       }
+
+       return rc;
+}
+
+static int qmt_start_pool_recalc(struct lu_env *env, struct qmt_pool_info *qpi)
+{
+       struct task_struct *task;
+       int rc = 0;
+
+       if (!test_and_set_bit(QPI_FLAG_RECALC_OFFSET, &qpi->qpi_flags)) {
+               LASSERT(!qpi->qpi_recalc_task);
+
+               qpi_getref(qpi);
+               task = kthread_create(qmt_pool_recalc, qpi,
+                                     "qsd_reint_%s", qpi->qpi_name);
+               if (IS_ERR(task)) {
+                       clear_bit(QPI_FLAG_RECALC_OFFSET, &qpi->qpi_flags);
+                       rc = PTR_ERR(task);
+                       qpi_putref(env, qpi);
+               } else {
+                       qpi->qpi_recalc_task = task;
+                       /* Using park/unpark to start the thread ensures that
+                        * the thread function does get calls, so the
+                        * ref on qpi will be dropped
+                        */
+                       kthread_park(task);
+                       kthread_unpark(task);
+               }
+       }
+
+       RETURN(rc);
+}
+
+static inline void qmt_stop_pool_recalc(struct qmt_pool_info *qpi)
+{
+       struct task_struct *task;
+
+       task = xchg(&qpi->qpi_recalc_task, NULL);
+       if (task)
+               kthread_stop(task);
+}
+
+static int qmt_pool_slv_nr_change(const struct lu_env *env,
+                                 struct qmt_pool_info *pool,
+                                 int idx, bool add)
+{
+       struct qmt_pool_info *glbl_pool;
+       int qtype;
+
+       glbl_pool = qmt_pool_lookup_glb(env, pool->qpi_qmt, LQUOTA_RES_DT);
+       if (IS_ERR(glbl_pool))
+               RETURN(PTR_ERR(glbl_pool));
+
+       for (qtype = 0; qtype < LL_MAXQUOTAS; qtype++) {
+               struct qmt_thread_info  *qti = qmt_info(env);
+               struct dt_object *slv_obj;
+               struct obd_uuid uuid;
+
+               /* We don't need fsname here - anyway
+                * lquota_disk_slv_filename ignores it. */
+               snprintf(uuid.uuid, UUID_MAX, "-OST%04x_UUID", idx);
+               lquota_generate_fid(&qti->qti_fid, pool->qpi_rtype,
+                                   qtype);
+               /* look-up index file associated with acquiring slave */
+               slv_obj = lquota_disk_slv_find(env,
+                                       glbl_pool->qpi_qmt->qmt_child,
+                                       glbl_pool->qpi_root,
+                                       &qti->qti_fid,
+                                       &uuid);
+               if (IS_ERR(slv_obj))
+                       continue;
+
+               if (add)
+                       pool->qpi_slv_nr[QMT_STYPE_OST][qtype]++;
+               else
+                       pool->qpi_slv_nr[QMT_STYPE_OST][qtype]--;
+               dt_object_put(env, slv_obj);
+       }
+       qpi_putref(env, glbl_pool);
+
+       return 0;
+}
+
+static int qmt_pool_add_rem(struct obd_device *obd, char *poolname,
+                           char *slavename, bool add)
+{
+       struct qmt_device       *qmt = lu2qmt_dev(obd->obd_lu_dev);
+       struct qmt_pool_info    *qpi;
+       struct lu_env            env;
+       int                      rc, idx;
+       ENTRY;
+
+       if (qmt->qmt_stopping)
+               RETURN(0);
+
+       if (strnlen(poolname, LOV_MAXPOOLNAME + 1) > LOV_MAXPOOLNAME)
+               RETURN(-ENAMETOOLONG);
+
+       CDEBUG(D_QUOTA, add ? "%s: pool %s, adding %s\n" :
+                             "%s: pool %s, removing %s\n",
+             obd->obd_name, poolname, slavename);
+
+       rc = server_name2index(slavename, &idx, NULL);
+       if (rc != LDD_F_SV_TYPE_OST)
+               RETURN(-EINVAL);
+
+       rc = lu_env_init(&env, LCT_MD_THREAD);
+       if (rc) {
+               CERROR("%s: cannot init env: rc = %d\n", obd->obd_name, rc);
+               RETURN(rc);
+       }
+
+       qpi = qmt_pool_lookup_name(&env, qmt, LQUOTA_RES_DT, poolname);
+       if (IS_ERR(qpi)) {
+               CWARN("%s: can't find pool %s: rc = %long\n",
+                     obd->obd_name, poolname, PTR_ERR(qpi));
+               GOTO(out, rc = PTR_ERR(qpi));
+       }
+
+       rc = add ? qmt_sarr_pool_add(qpi, idx, 32) :
+                  qmt_sarr_pool_rem(qpi, idx);
+       if (rc) {
+               CERROR("%s: can't %s %s pool %s: rc = %d\n",
+                      add ? "add to" : "remove", obd->obd_name,
+                      slavename, poolname, rc);
+               GOTO(out_putref, rc);
+       }
+       qmt_pool_slv_nr_change(&env, qpi, idx, add);
+       qmt_start_pool_recalc(&env, qpi);
+
+out_putref:
+       qpi_putref(&env, qpi);
+out:
+       lu_env_fini(&env);
+       RETURN(rc);
+}
+
+
+
+/**
+ * Add a single target device to the named pool.
+ *
+ * \param[in] obd      OBD device on which to add the pool
+ * \param[in] poolname name of the pool to which to add the target \a slavename
+ * \param[in] slavename        name of the target device to be added
+ *
+ * \retval             0 if \a slavename was (previously) added to the pool
+ * \retval             negative error number on failure
+ */
+int qmt_pool_add(struct obd_device *obd, char *poolname, char *slavename)
+{
+       return qmt_pool_add_rem(obd, poolname, slavename, true);
+}
+
+/**
+ * Remove the named target from the specified pool.
+ *
+ * \param[in] obd      OBD device from which to remove \a poolname
+ * \param[in] poolname name of the pool to be changed
+ * \param[in] slavename        name of the target to remove from \a poolname
+ *
+ * \retval             0 on successfully removing \a slavename from the pool
+ * \retval             negative number on error (e.g. \a slavename not in pool)
+ */
+int qmt_pool_rem(struct obd_device *obd, char *poolname, char *slavename)
+{
+       return qmt_pool_add_rem(obd, poolname, slavename, false);
+}
+
+/**
+ * Remove the named pool from the QMT device.
+ *
+ * \param[in] obd      OBD device on which pool was previously created
+ * \param[in] poolname name of pool to remove from \a obd
+ *
+ * \retval             0 on successfully removing the pool
+ * \retval             negative error numbers for failures
+ */
+int qmt_pool_del(struct obd_device *obd, char *poolname)
+{
+       struct qmt_device       *qmt = lu2qmt_dev(obd->obd_lu_dev);
+       struct qmt_pool_info    *qpi;
+       struct lu_fid            fid;
+       char                     buf[LQUOTA_NAME_MAX];
+       struct lu_env            env;
+       int                      rc;
+       int                      qtype;
+       ENTRY;
+
+       if (strnlen(poolname, LOV_MAXPOOLNAME + 1) > LOV_MAXPOOLNAME)
+               RETURN(-ENAMETOOLONG);
+
+       CDEBUG(D_QUOTA, "Removing quota pool "LOV_POOLNAMEF"\n",
+              poolname);
+
+       rc = lu_env_init(&env, LCT_MD_THREAD);
+       if (rc) {
+               CERROR("%s: cannot init env: rc = %d\n", obd->obd_name, rc);
+               RETURN(rc);
+       }
+
+       /* look-up pool in charge of this global index FID */
+       qpi = qmt_pool_lookup_name(&env, qmt, LQUOTA_RES_DT, poolname);
+       if (IS_ERR(qpi)) {
+               /* Valid case for several MDTs at the same node -
+                * pool removed by the 1st MDT in config */
+               CDEBUG(D_QUOTA, "Cannot find pool %s\n", poolname);
+               lu_env_fini(&env);
+               RETURN(PTR_ERR(qpi));
+       }
+
+       for (qtype = 0; qtype < LL_MAXQUOTAS; qtype++) {
+               lquota_generate_fid(&fid, LQUOTA_RES_DT, qtype);
+               snprintf(buf, LQUOTA_NAME_MAX, "0x%x", fid.f_oid);
+               rc = local_object_unlink(&env, qmt->qmt_child,
+                                        qpi->qpi_root, buf);
+               if (rc)
+                       CWARN("%s: cannot unlink %s from pool %s: rc = %d\n",
+                             obd->obd_name, buf, poolname, rc);
+       }
+
+       /* put ref from look-up */
+       qpi_putref(&env, qpi);
+       /* put last ref to free qpi */
+       qpi_putref(&env, qpi);
+
+       snprintf(buf, LQUOTA_NAME_MAX, "%s-%s",
+                RES_NAME(LQUOTA_RES_DT), poolname);
+       rc = local_object_unlink(&env, qmt->qmt_child, qmt->qmt_root, buf);
+       if (rc)
+               CWARN("%s: cannot unlink dir %s: rc = %d\n",
+                     obd->obd_name, poolname, rc);
+
+       lu_env_fini(&env);
+       RETURN(0);
+}
+
+static inline int qmt_sarr_pool_init(struct qmt_pool_info *qpi)
+{
+
+       /* No need to initialize sarray for global pool
+        * as it always includes all slaves */
+       if (qmt_pool_global(qpi))
+               return 0;
+
+       switch (qpi->qpi_rtype) {
+       case LQUOTA_RES_DT:
+               return lu_tgt_pool_init(&qpi->qpi_sarr.osts, 0);
+       case LQUOTA_RES_MD:
+       default:
+               return 0;
+       }
+}
+
+static inline int qmt_sarr_pool_add(struct qmt_pool_info *qpi, int idx, int min)
+{
+       switch (qpi->qpi_rtype) {
+       case LQUOTA_RES_DT:
+               return lu_tgt_pool_add(&qpi->qpi_sarr.osts, idx, min);
+       case LQUOTA_RES_MD:
+       default:
+               return 0;
+       }
+}
+
+static inline int qmt_sarr_pool_rem(struct qmt_pool_info *qpi, int idx)
+{
+       switch (qpi->qpi_rtype) {
+       case LQUOTA_RES_DT:
+               return lu_tgt_pool_remove(&qpi->qpi_sarr.osts, idx);
+       case LQUOTA_RES_MD:
+       default:
+               return 0;
+       }
+}
+
+static inline void qmt_sarr_pool_free(struct qmt_pool_info *qpi)
+{
+       if (qmt_pool_global(qpi))
+               return;
+
+       switch (qpi->qpi_rtype) {
+       case LQUOTA_RES_DT:
+               if (qpi->qpi_sarr.osts.op_array)
+                       lu_tgt_pool_free(&qpi->qpi_sarr.osts);
+               return;
+       case LQUOTA_RES_MD:
+       default:
+               return;
+       }
+}
+
+static inline int qmt_sarr_check_idx(struct qmt_pool_info *qpi, int idx)
+{
+       if (qmt_pool_global(qpi))
+               return 0;
+
+       switch (qpi->qpi_rtype) {
+       case LQUOTA_RES_DT:
+               return lu_tgt_check_index(idx, &qpi->qpi_sarr.osts);
+       case LQUOTA_RES_MD:
+       default:
+               return 0;
+       }
+}
+
+struct rw_semaphore *qmt_sarr_rwsem(struct qmt_pool_info *qpi)
+{
+       switch (qpi->qpi_rtype) {
+       case LQUOTA_RES_DT:
+               /* to protect ost_pool use */
+               return &qpi->qpi_sarr.osts.op_rw_sem;
+       case LQUOTA_RES_MD:
+       default:
+               return NULL;
+       }
+}
+
+int qmt_sarr_get_idx(struct qmt_pool_info *qpi, int arr_idx)
+{
+
+       if (qmt_pool_global(qpi))
+               return arr_idx;
+
+       switch (qpi->qpi_rtype) {
+       case LQUOTA_RES_DT:
+               LASSERTF(arr_idx < qpi->qpi_sarr.osts.op_count && arr_idx >= 0,
+                        "idx invalid %d op_count %d\n", arr_idx,
+                        qpi->qpi_sarr.osts.op_count);
+               return qpi->qpi_sarr.osts.op_array[arr_idx];
+       case LQUOTA_RES_MD:
+       default:
+               return -EINVAL;
+       }
+}
+
+/* Number of slaves in a pool */
+unsigned int qmt_sarr_count(struct qmt_pool_info *qpi)
+{
+       switch (qpi->qpi_rtype) {
+       case LQUOTA_RES_DT:
+               return qpi->qpi_sarr.osts.op_count;
+       case LQUOTA_RES_MD:
+       default:
+               return -EINVAL;
+       }
+}