Whamcloud - gitweb
LU-11023 quota: quota pools for OSTs 15/35615/52
authorSergey Cheremencev <c17829@cray.com>
Fri, 12 Jul 2019 13:38:35 +0000 (16:38 +0300)
committerOleg Drokin <green@whamcloud.com>
Thu, 14 May 2020 05:38:29 +0000 (05:38 +0000)
Patch allows to apply quota settings
not only for the whole system, but also
for different OST pools. Since this patch
each "LOD" pool is duplicated by QMT.
Thus quota pools(QP) could be tuned by
standard lctl pool_new/add/remove/erase
commands. All QPs are subset of a global pool
that includes all data devices in a system,
including DOM. However DOM is not supported.
I don't see a lot of work to add DOM support
in future - just need to decide how MDTs
could be combined in a pool.

The main idea of QP is to find all pools
for requested ID(usr/grp/prj) and apply
minimum limit. The patch doesn't affect
qsd side, so slaves know nothing about
pools and different limits. Qunit and edquot
are calculated for each slave on master.

To apply quota on QP, the patch adds key "-o"
to lfs setquota. To get quotas for QP, it
provides long option "--pool" in lfs quota.
See examples of using in sanity-quota_1b/c/d.

Now QPs work properly only on a clean system.
Support of recalculation granted space in case
of adding/removing OSTs in a pool will be added
in the next patch together with accounting
already granted space by each ID in a POOl.

Test-Parameters: testgroup=review-dne-part-4

Change-Id: I3396aded2156729b4fd15166eb59db59ee4c967e
Signed-off-by: Sergey Cheremencev <c17829@cray.com>
Reviewed-on: https://review.whamcloud.com/35615
Tested-by: jenkins <devops@whamcloud.com>
Reviewed-by: James Simmons <jsimmons@infradead.org>
Tested-by: Maloo <maloo@whamcloud.com>
Reviewed-by: Andreas Dilger <adilger@whamcloud.com>
Reviewed-by: Shaun Tancheff <shaun.tancheff@hpe.com>
Reviewed-by: Hongchao Zhang <hongchao@whamcloud.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
37 files changed:
lustre/include/lu_object.h
lustre/include/lustre_quota.h
lustre/include/lustre_req_layout.h
lustre/include/lustre_swab.h
lustre/include/obd_target.h
lustre/include/uapi/linux/lustre/lustre_idl.h
lustre/include/uapi/linux/lustre/lustre_user.h
lustre/llite/dir.c
lustre/lod/lod_internal.h
lustre/lod/lod_lov.c
lustre/lod/lod_pool.c
lustre/lod/lod_qos.c
lustre/mdc/mdc_request.c
lustre/mdt/mdt_handler.c
lustre/obdclass/genops.c
lustre/obdclass/obd_config.c
lustre/obdclass/obd_mount.c
lustre/ptlrpc/Makefile.in
lustre/ptlrpc/layout.c
lustre/ptlrpc/pack_generic.c
lustre/quota/lquota_disk.c
lustre/quota/lquota_entry.c
lustre/quota/lquota_internal.h
lustre/quota/qmt_dev.c
lustre/quota/qmt_entry.c
lustre/quota/qmt_handler.c
lustre/quota/qmt_internal.h
lustre/quota/qmt_lock.c
lustre/quota/qmt_pool.c
lustre/quota/qsd_entry.c
lustre/quota/qsd_handler.c
lustre/quota/qsd_lock.c
lustre/target/Makefile.am
lustre/target/tgt_pool.c [new file with mode: 0644]
lustre/tests/sanity-quota.sh
lustre/tests/test-framework.sh
lustre/utils/lfs.c

index bf2c445..e656d94 100644 (file)
@@ -39,6 +39,7 @@
 #include <lu_ref.h>
 #include <linux/percpu_counter.h>
 #include <linux/ctype.h>
+#include <obd_target.h>
 
 struct seq_file;
 struct proc_dir_entry;
@@ -1488,15 +1489,6 @@ static inline bool lu_object_is_cl(const struct lu_object *o)
        return lu_device_is_cl(o->lo_dev);
 }
 
-/* Generic subset of tgts */
-struct lu_tgt_pool {
-       __u32              *op_array;   /* array of index of
-                                        * lov_obd->lov_tgts */
-       unsigned int        op_count;   /* number of tgts in the array */
-       unsigned int        op_size;    /* allocated size of op_array */
-       struct rw_semaphore op_rw_sem;  /* to protect lu_tgt_pool use */
-};
-
 /* round-robin QoS data for LOD/LMV */
 struct lu_qos_rr {
        spinlock_t               lqr_alloc;     /* protect allocation index */
index 17ff2da..f63874c 100644 (file)
@@ -35,6 +35,7 @@
 #include <linux/fs.h>
 #include <linux/quota.h>
 #include <linux/quotaops.h>
+#include <linux/sort.h>
 #include <dt_object.h>
 #include <lustre_fid.h>
 #include <lustre_dlm.h>
index 707c27d..d634a7d 100644 (file)
@@ -283,6 +283,7 @@ extern struct req_msg_field RMF_CAPA1;
 extern struct req_msg_field RMF_CAPA2;
 extern struct req_msg_field RMF_OBD_QUOTACHECK;
 extern struct req_msg_field RMF_OBD_QUOTACTL;
+extern struct req_msg_field RMF_OBD_QUOTACTL_POOL;
 extern struct req_msg_field RMF_QUOTA_BODY;
 extern struct req_msg_field RMF_STRING;
 extern struct req_msg_field RMF_SWAP_LAYOUTS;
index bdf4f33..7db43f6 100644 (file)
@@ -62,7 +62,7 @@ void lustre_swab_obd_ioobj(struct obd_ioobj *ioo);
 void lustre_swab_niobuf_remote(struct niobuf_remote *nbr);
 void lustre_swab_ost_lvb_v1(struct ost_lvb_v1 *lvb);
 void lustre_swab_ost_lvb(struct ost_lvb *lvb);
-void lustre_swab_obd_quotactl(struct obd_quotactl *q);
+int lustre_swab_obd_quotactl(struct obd_quotactl *q, __u32 len);
 void lustre_swab_quota_body(struct quota_body *b);
 void lustre_swab_lquota_lvb(struct lquota_lvb *lvb);
 void lustre_swab_barrier_lvb(struct barrier_lvb *lvb);
index 79f29dd..406f408 100644 (file)
@@ -31,6 +31,7 @@
 
 #ifndef __OBD_TARGET_H
 #define __OBD_TARGET_H
+#include <lprocfs_status.h>
 
 /* server-side individual type definitions */
 
@@ -70,4 +71,20 @@ struct ost_obd {
        struct mutex             ost_health_mutex;
 };
 
+/* Generic subset of tgts */
+struct lu_tgt_pool {
+       __u32              *op_array;   /* array of index of
+                                        * lov_obd->lov_tgts */
+       unsigned int        op_count;   /* number of tgts in the array */
+       unsigned int        op_size;    /* allocated size of op_array */
+       struct rw_semaphore op_rw_sem;  /* to protect lu_tgt_pool use */
+};
+
+int tgt_pool_init(struct lu_tgt_pool *op, unsigned int count);
+int tgt_pool_add(struct lu_tgt_pool *op, __u32 idx, unsigned int min_count);
+int tgt_pool_remove(struct lu_tgt_pool *op, __u32 idx);
+int tgt_pool_free(struct lu_tgt_pool *op);
+int tgt_check_index(int idx, struct lu_tgt_pool *osts);
+int tgt_pool_extend(struct lu_tgt_pool *op, unsigned int min_count);
+
 #endif /* __OBD_TARGET_H */
index fc351f8..57d2afe 100644 (file)
@@ -1467,18 +1467,23 @@ struct obd_quotactl {
        __u32                   qc_stat;
        struct obd_dqinfo       qc_dqinfo;
        struct obd_dqblk        qc_dqblk;
+       char                    qc_poolname[0];
 };
 
 #define Q_COPY(out, in, member) (out)->member = (in)->member
 
-#define QCTL_COPY(out, in)             \
-do {                                   \
-       Q_COPY(out, in, qc_cmd);        \
-       Q_COPY(out, in, qc_type);       \
-       Q_COPY(out, in, qc_id);         \
-       Q_COPY(out, in, qc_stat);       \
-       Q_COPY(out, in, qc_dqinfo);     \
-       Q_COPY(out, in, qc_dqblk);      \
+#define QCTL_COPY(out, in)                             \
+do {                                                   \
+       Q_COPY(out, in, qc_cmd);                        \
+       Q_COPY(out, in, qc_type);                       \
+       Q_COPY(out, in, qc_id);                         \
+       Q_COPY(out, in, qc_stat);                       \
+       Q_COPY(out, in, qc_dqinfo);                     \
+       Q_COPY(out, in, qc_dqblk);                      \
+       if (LUSTRE_Q_CMD_IS_POOL(in->qc_cmd))           \
+               memcpy(out->qc_poolname,                \
+                      in->qc_poolname,                 \
+                      LOV_MAXPOOLNAME + 1);            \
 } while (0)
 
 /* Body of quota request used for quota acquire/release RPCs between quota
index e520f88..93fc33a 100644 (file)
@@ -1199,7 +1199,10 @@ static inline __u64 lustre_stoqb(__kernel_size_t space)
 #define LUSTRE_Q_FINVALIDATE 0x80000c     /* deprecated as of 2.4 */
 #define LUSTRE_Q_GETDEFAULT  0x80000d     /* get default quota */
 #define LUSTRE_Q_SETDEFAULT  0x80000e     /* set default quota */
-
+#define LUSTRE_Q_GETQUOTAPOOL  0x80000f  /* get user pool quota */
+#define LUSTRE_Q_SETQUOTAPOOL  0x800010  /* set user pool quota */
+#define LUSTRE_Q_GETINFOPOOL   0x800011  /* get pool quota info */
+#define LUSTRE_Q_SETINFOPOOL   0x800012  /* set pool quota info */
 /* In the current Lustre implementation, the grace time is either the time
  * or the timestamp to be used after some quota ID exceeds the soft limt,
  * 48 bits should be enough, its high 16 bits can be used as quota flags.
@@ -1225,6 +1228,12 @@ static inline __u64 lustre_stoqb(__kernel_size_t space)
  * */
 #define LQUOTA_FLAG_DEFAULT    0x0001
 
+#define LUSTRE_Q_CMD_IS_POOL(cmd)              \
+       (cmd == LUSTRE_Q_GETQUOTAPOOL ||        \
+        cmd == LUSTRE_Q_SETQUOTAPOOL ||        \
+        cmd == LUSTRE_Q_SETINFOPOOL ||         \
+        cmd == LUSTRE_Q_GETINFOPOOL)
+
 #define ALLQUOTA 255       /* set all quota */
 static inline const char *qtype_name(int qtype)
 {
@@ -1335,6 +1344,7 @@ struct if_quotactl {
        struct obd_dqblk        qc_dqblk;
        char                    obd_type[16];
        struct obd_uuid         obd_uuid;
+       char                    qc_poolname[0];
 };
 
 /* swap layout flags */
index 4f54260..66a72f7 100644 (file)
@@ -1131,29 +1131,34 @@ static int quotactl_ioctl(struct ll_sb_info *sbi, struct if_quotactl *qctl)
        case Q_SETQUOTA:
        case Q_SETINFO:
        case LUSTRE_Q_SETDEFAULT:
+       case LUSTRE_Q_SETQUOTAPOOL:
+       case LUSTRE_Q_SETINFOPOOL:
                if (!cfs_capable(CFS_CAP_SYS_ADMIN))
                        RETURN(-EPERM);
                break;
        case Q_GETQUOTA:
        case LUSTRE_Q_GETDEFAULT:
+       case LUSTRE_Q_GETQUOTAPOOL:
                if (check_owner(type, id) &&
                    (!cfs_capable(CFS_CAP_SYS_ADMIN)))
                        RETURN(-EPERM);
                break;
        case Q_GETINFO:
+       case LUSTRE_Q_GETINFOPOOL:
                break;
        default:
                CERROR("unsupported quotactl op: %#x\n", cmd);
                RETURN(-ENOTSUPP);
        }
 
-        if (valid != QC_GENERAL) {
-                if (cmd == Q_GETINFO)
-                        qctl->qc_cmd = Q_GETOINFO;
-                else if (cmd == Q_GETQUOTA)
-                        qctl->qc_cmd = Q_GETOQUOTA;
-                else
-                        RETURN(-EINVAL);
+       if (valid != QC_GENERAL) {
+               if (cmd == Q_GETINFO)
+                       qctl->qc_cmd = Q_GETOINFO;
+               else if (cmd == Q_GETQUOTA ||
+                        cmd == LUSTRE_Q_GETQUOTAPOOL)
+                       qctl->qc_cmd = Q_GETOQUOTA;
+               else
+                       RETURN(-EINVAL);
 
                 switch (valid) {
                 case QC_MDTIDX:
@@ -1183,22 +1188,26 @@ static int quotactl_ioctl(struct ll_sb_info *sbi, struct if_quotactl *qctl)
                 qctl->qc_cmd = cmd;
         } else {
                 struct obd_quotactl *oqctl;
+               int oqctl_len = sizeof(*oqctl);
 
-                OBD_ALLOC_PTR(oqctl);
-                if (oqctl == NULL)
-                        RETURN(-ENOMEM);
+               if (LUSTRE_Q_CMD_IS_POOL(cmd))
+                       oqctl_len += LOV_MAXPOOLNAME + 1;
 
-                QCTL_COPY(oqctl, qctl);
-                rc = obd_quotactl(sbi->ll_md_exp, oqctl);
-                if (rc) {
-                        OBD_FREE_PTR(oqctl);
-                        RETURN(rc);
-                }
+               OBD_ALLOC(oqctl, oqctl_len);
+               if (oqctl == NULL)
+                       RETURN(-ENOMEM);
+
+               QCTL_COPY(oqctl, qctl);
+               rc = obd_quotactl(sbi->ll_md_exp, oqctl);
+               if (rc) {
+                       OBD_FREE(oqctl, oqctl_len);
+                       RETURN(rc);
+               }
                 /* If QIF_SPACE is not set, client should collect the
                  * space usage from OSSs by itself */
-                if (cmd == Q_GETQUOTA &&
-                    !(oqctl->qc_dqblk.dqb_valid & QIF_SPACE) &&
-                    !oqctl->qc_dqblk.dqb_curspace) {
+               if ((cmd == Q_GETQUOTA || cmd == LUSTRE_Q_GETQUOTAPOOL) &&
+                   !(oqctl->qc_dqblk.dqb_valid & QIF_SPACE) &&
+                   !oqctl->qc_dqblk.dqb_curspace) {
                         struct obd_quotactl *oqctl_tmp;
 
                         OBD_ALLOC_PTR(oqctl_tmp);
@@ -1235,11 +1244,11 @@ static int quotactl_ioctl(struct ll_sb_info *sbi, struct if_quotactl *qctl)
                         OBD_FREE_PTR(oqctl_tmp);
                 }
 out:
-                QCTL_COPY(qctl, oqctl);
-                OBD_FREE_PTR(oqctl);
-        }
+               QCTL_COPY(qctl, oqctl);
+               OBD_FREE(oqctl, oqctl_len);
+       }
 
-        RETURN(rc);
+       RETURN(rc);
 }
 
 int ll_rmfid(struct file *file, void __user *arg)
@@ -1845,24 +1854,32 @@ out_req:
                return rc;
        }
        case OBD_IOC_QUOTACTL: {
-                struct if_quotactl *qctl;
+               struct if_quotactl *qctl;
+               int qctl_len = sizeof(*qctl) + LOV_MAXPOOLNAME + 1;
 
-                OBD_ALLOC_PTR(qctl);
-                if (!qctl)
-                        RETURN(-ENOMEM);
+               OBD_ALLOC(qctl, qctl_len);
+               if (!qctl)
+                       RETURN(-ENOMEM);
 
                if (copy_from_user(qctl, (void __user *)arg, sizeof(*qctl)))
-                        GOTO(out_quotactl, rc = -EFAULT);
-
-                rc = quotactl_ioctl(sbi, qctl);
+                       GOTO(out_quotactl, rc = -EFAULT);
+
+               if (LUSTRE_Q_CMD_IS_POOL(qctl->qc_cmd)) {
+                       char __user *from = (char __user *)arg +
+                                       offsetof(typeof(*qctl), qc_poolname);
+                       if (copy_from_user(qctl->qc_poolname, from,
+                                          LOV_MAXPOOLNAME + 1))
+                               GOTO(out_quotactl, rc = -EFAULT);
+               }
 
+               rc = quotactl_ioctl(sbi, qctl);
                if (rc == 0 &&
                    copy_to_user((void __user *)arg, qctl, sizeof(*qctl)))
                         rc = -EFAULT;
 
-        out_quotactl:
-                OBD_FREE_PTR(qctl);
-                RETURN(rc);
+out_quotactl:
+               OBD_FREE(qctl, qctl_len);
+               RETURN(rc);
         }
         case OBD_IOC_GETDTNAME:
         case OBD_IOC_GETMDNAME:
index 225cdd1..df3eeba 100644 (file)
@@ -620,11 +620,6 @@ int lod_alloc_comp_entries(struct lod_object *lo, int mirror_cnt, int comp_cnt);
 int lod_fill_mirrors(struct lod_object *lo);
 
 /* lod_pool.c */
-int lod_tgt_pool_init(struct lu_tgt_pool *op, unsigned int count);
-int lod_tgt_pool_free(struct lu_tgt_pool *op);
-int lod_tgt_pool_add(struct lu_tgt_pool *op, __u32 idx, unsigned int min_count);
-int lod_tgt_pool_remove(struct lu_tgt_pool *op, __u32 idx);
-int lod_tgt_pool_extend(struct lu_tgt_pool *op, unsigned int min_count);
 struct pool_desc *lod_find_pool(struct lod_device *lod, char *poolname);
 void lod_pool_putref(struct pool_desc *pool);
 int lod_pool_del(struct obd_device *obd, char *poolname);
index 91327ee..d0a711e 100644 (file)
@@ -87,8 +87,8 @@ void lod_putref(struct lod_device *lod, struct lod_tgt_descs *ltd)
                                continue;
 
                        list_add(&tgt_desc->ltd_kill, &kill);
-                       lod_tgt_pool_remove(&ltd->ltd_tgt_pool,
-                                           tgt_desc->ltd_index);
+                       tgt_pool_remove(&ltd->ltd_tgt_pool,
+                                       tgt_desc->ltd_index);
                        ltd_del_tgt(ltd, tgt_desc);
                        ltd->ltd_death_row--;
                }
@@ -252,8 +252,8 @@ int lod_add_device(const struct lu_env *env, struct lod_device *lod,
        if (rc)
                GOTO(out_del_tgt, rc);
 
-       rc = lod_tgt_pool_add(&ltd->ltd_tgt_pool, index,
-                             ltd->ltd_lov_desc.ld_tgt_count);
+       rc = tgt_pool_add(&ltd->ltd_tgt_pool, index,
+                         ltd->ltd_lov_desc.ld_tgt_count);
        if (rc) {
                CERROR("%s: can't set up pool, failed with %d\n",
                       obd->obd_name, rc);
@@ -288,7 +288,7 @@ out_fini_llog:
 out_ltd:
        down_write(&ltd->ltd_rw_sem);
        mutex_lock(&ltd->ltd_mutex);
-       lod_tgt_pool_remove(&ltd->ltd_tgt_pool, index);
+       tgt_pool_remove(&ltd->ltd_tgt_pool, index);
 out_del_tgt:
        ltd_del_tgt(ltd, tgt_desc);
 out_mutex:
@@ -2174,30 +2174,30 @@ int lod_pools_init(struct lod_device *lod, struct lustre_cfg *lcfg)
 
        INIT_LIST_HEAD(&lod->lod_pool_list);
        lod->lod_pool_count = 0;
-       rc = lod_tgt_pool_init(&lod->lod_mdt_descs.ltd_tgt_pool, 0);
+       rc = tgt_pool_init(&lod->lod_mdt_descs.ltd_tgt_pool, 0);
        if (rc)
                GOTO(out_hash, rc);
 
-       rc = lod_tgt_pool_init(&lod->lod_mdt_descs.ltd_qos.lq_rr.lqr_pool, 0);
+       rc = tgt_pool_init(&lod->lod_mdt_descs.ltd_qos.lq_rr.lqr_pool, 0);
        if (rc)
                GOTO(out_mdt_pool, rc);
 
-       rc = lod_tgt_pool_init(&lod->lod_ost_descs.ltd_tgt_pool, 0);
+       rc = tgt_pool_init(&lod->lod_ost_descs.ltd_tgt_pool, 0);
        if (rc)
                GOTO(out_mdt_rr_pool, rc);
 
-       rc = lod_tgt_pool_init(&lod->lod_ost_descs.ltd_qos.lq_rr.lqr_pool, 0);
+       rc = tgt_pool_init(&lod->lod_ost_descs.ltd_qos.lq_rr.lqr_pool, 0);
        if (rc)
                GOTO(out_ost_pool, rc);
 
        RETURN(0);
 
 out_ost_pool:
-       lod_tgt_pool_free(&lod->lod_ost_descs.ltd_tgt_pool);
+       tgt_pool_free(&lod->lod_ost_descs.ltd_tgt_pool);
 out_mdt_rr_pool:
-       lod_tgt_pool_free(&lod->lod_mdt_descs.ltd_qos.lq_rr.lqr_pool);
+       tgt_pool_free(&lod->lod_mdt_descs.ltd_qos.lq_rr.lqr_pool);
 out_mdt_pool:
-       lod_tgt_pool_free(&lod->lod_mdt_descs.ltd_tgt_pool);
+       tgt_pool_free(&lod->lod_mdt_descs.ltd_tgt_pool);
 out_hash:
        lod_pool_hash_destroy(&lod->lod_pools_hash_body);
 
@@ -2227,10 +2227,10 @@ int lod_pools_fini(struct lod_device *lod)
        }
 
        lod_pool_hash_destroy(&lod->lod_pools_hash_body);
-       lod_tgt_pool_free(&lod->lod_ost_descs.ltd_qos.lq_rr.lqr_pool);
-       lod_tgt_pool_free(&lod->lod_ost_descs.ltd_tgt_pool);
-       lod_tgt_pool_free(&lod->lod_mdt_descs.ltd_qos.lq_rr.lqr_pool);
-       lod_tgt_pool_free(&lod->lod_mdt_descs.ltd_tgt_pool);
+       tgt_pool_free(&lod->lod_ost_descs.ltd_qos.lq_rr.lqr_pool);
+       tgt_pool_free(&lod->lod_ost_descs.ltd_tgt_pool);
+       tgt_pool_free(&lod->lod_mdt_descs.ltd_qos.lq_rr.lqr_pool);
+       tgt_pool_free(&lod->lod_mdt_descs.ltd_tgt_pool);
 
        RETURN(0);
 }
index 04d81dd..55932d5 100644 (file)
@@ -99,8 +99,8 @@ void lod_pool_putref(struct pool_desc *pool)
        if (atomic_dec_and_test(&pool->pool_refcount)) {
                LASSERT(list_empty(&pool->pool_list));
                LASSERT(pool->pool_proc_entry == NULL);
-               lod_tgt_pool_free(&(pool->pool_rr.lqr_pool));
-               lod_tgt_pool_free(&(pool->pool_obds));
+               tgt_pool_free(&(pool->pool_rr.lqr_pool));
+               tgt_pool_free(&(pool->pool_obds));
                kfree_rcu(pool, pool_rcu);
                EXIT;
        }
@@ -358,183 +358,6 @@ void lod_dump_pool(int level, struct pool_desc *pool)
        lod_pool_putref(pool);
 }
 
-/**
- * Initialize the pool data structures at startup.
- *
- * Allocate and initialize the pool data structures with the specified
- * array size.  If pool count is not specified (\a count == 0), then
- * POOL_INIT_COUNT will be used.  Allocating a non-zero initial array
- * size avoids the need to reallocate as new pools are added.
- *
- * \param[in] op       pool structure
- * \param[in] count    initial size of the target op_array[] array
- *
- * \retval             0 indicates successful pool initialization
- * \retval             negative error number on failure
- */
-#define POOL_INIT_COUNT 2
-int lod_tgt_pool_init(struct lu_tgt_pool *op, unsigned int count)
-{
-       ENTRY;
-
-       if (count == 0)
-               count = POOL_INIT_COUNT;
-       op->op_array = NULL;
-       op->op_count = 0;
-       init_rwsem(&op->op_rw_sem);
-       op->op_size = count * sizeof(op->op_array[0]);
-       OBD_ALLOC(op->op_array, op->op_size);
-       if (op->op_array == NULL) {
-               op->op_size = 0;
-               RETURN(-ENOMEM);
-       }
-       EXIT;
-       return 0;
-}
-
-/**
- * Increase the op_array size to hold more targets in this pool.
- *
- * The size is increased to at least \a min_count, but may be larger
- * for an existing pool since ->op_array[] is growing exponentially.
- * Caller must hold write op_rwlock.
- *
- * \param[in] op       pool structure
- * \param[in] min_count        minimum number of entries to handle
- *
- * \retval             0 on success
- * \retval             negative error number on failure.
- */
-int lod_tgt_pool_extend(struct lu_tgt_pool *op, unsigned int min_count)
-{
-       __u32 *new;
-       __u32 new_size;
-
-       LASSERT(min_count != 0);
-
-       if (op->op_count * sizeof(op->op_array[0]) < op->op_size)
-               return 0;
-
-       new_size = max_t(__u32, min_count * sizeof(op->op_array[0]),
-                        2 * op->op_size);
-       OBD_ALLOC(new, new_size);
-       if (new == NULL)
-               return -ENOMEM;
-
-       /* copy old array to new one */
-       memcpy(new, op->op_array, op->op_size);
-       OBD_FREE(op->op_array, op->op_size);
-       op->op_array = new;
-       op->op_size = new_size;
-
-       return 0;
-}
-
-/**
- * Add a new target to an existing pool.
- *
- * Add a new target device to the pool previously created and returned by
- * lod_pool_new().  Each target can only be in each pool at most one time.
- *
- * \param[in] op       target pool to add new entry
- * \param[in] idx      pool index number to add to the \a op array
- * \param[in] min_count        minimum number of entries to expect in the pool
- *
- * \retval             0 if target could be added to the pool
- * \retval             negative error if target \a idx was not added
- */
-int lod_tgt_pool_add(struct lu_tgt_pool *op, __u32 idx, unsigned int min_count)
-{
-       unsigned int i;
-       int rc = 0;
-       ENTRY;
-
-       down_write(&op->op_rw_sem);
-
-       rc = lod_tgt_pool_extend(op, min_count);
-       if (rc)
-               GOTO(out, rc);
-
-       /* search ost in pool array */
-       for (i = 0; i < op->op_count; i++) {
-               if (op->op_array[i] == idx)
-                       GOTO(out, rc = -EEXIST);
-       }
-       /* ost not found we add it */
-       op->op_array[op->op_count] = idx;
-       op->op_count++;
-       EXIT;
-out:
-       up_write(&op->op_rw_sem);
-       return rc;
-}
-
-/**
- * Remove an existing pool from the system.
- *
- * The specified pool must have previously been allocated by
- * lod_pool_new() and not have any target members in the pool.
- * If the removed target is not the last, compact the array
- * to remove empty spaces.
- *
- * \param[in] op       pointer to the original data structure
- * \param[in] idx      target index to be removed
- *
- * \retval             0 on success
- * \retval             negative error number on failure
- */
-int lod_tgt_pool_remove(struct lu_tgt_pool *op, __u32 idx)
-{
-       unsigned int i;
-       ENTRY;
-
-       down_write(&op->op_rw_sem);
-
-       for (i = 0; i < op->op_count; i++) {
-               if (op->op_array[i] == idx) {
-                       memmove(&op->op_array[i], &op->op_array[i + 1],
-                               (op->op_count - i - 1) *
-                               sizeof(op->op_array[0]));
-                       op->op_count--;
-                       up_write(&op->op_rw_sem);
-                       EXIT;
-                       return 0;
-               }
-       }
-
-       up_write(&op->op_rw_sem);
-       RETURN(-EINVAL);
-}
-
-/**
- * Free the pool after it was emptied and removed from /proc.
- *
- * Note that all of the child/target entries referenced by this pool
- * must have been removed by lod_ost_pool_remove() before it can be
- * deleted from memory.
- *
- * \param[in] op       pool to be freed.
- *
- * \retval             0 on success or if pool was already freed
- */
-int lod_tgt_pool_free(struct lu_tgt_pool *op)
-{
-       ENTRY;
-
-       if (op->op_size == 0)
-               RETURN(0);
-
-       down_write(&op->op_rw_sem);
-
-       OBD_FREE(op->op_array, op->op_size);
-       op->op_array = NULL;
-       op->op_count = 0;
-       op->op_size = 0;
-
-       up_write(&op->op_rw_sem);
-       RETURN(0);
-}
-
 static void pools_hash_exit(void *vpool, void *data)
 {
        struct pool_desc *pool = vpool;
@@ -584,13 +407,13 @@ int lod_pool_new(struct obd_device *obd, char *poolname)
        strlcpy(new_pool->pool_name, poolname, sizeof(new_pool->pool_name));
        new_pool->pool_lobd = obd;
        atomic_set(&new_pool->pool_refcount, 1);
-       rc = lod_tgt_pool_init(&new_pool->pool_obds, 0);
+       rc = tgt_pool_init(&new_pool->pool_obds, 0);
        if (rc)
                GOTO(out_err, rc);
 
        lu_qos_rr_init(&new_pool->pool_rr);
 
-       rc = lod_tgt_pool_init(&new_pool->pool_rr.lqr_pool, 0);
+       rc = tgt_pool_init(&new_pool->pool_rr.lqr_pool, 0);
        if (rc)
                GOTO(out_free_pool_obds, rc);
 
@@ -641,9 +464,9 @@ out_err:
 
        lprocfs_remove(&new_pool->pool_proc_entry);
 
-       lod_tgt_pool_free(&new_pool->pool_rr.lqr_pool);
+       tgt_pool_free(&new_pool->pool_rr.lqr_pool);
 out_free_pool_obds:
-       lod_tgt_pool_free(&new_pool->pool_obds);
+       tgt_pool_free(&new_pool->pool_obds);
        OBD_FREE_PTR(new_pool);
        return rc;
 }
@@ -736,7 +559,7 @@ int lod_pool_add(struct obd_device *obd, char *poolname, char *ostname)
        if (rc)
                GOTO(out, rc);
 
-       rc = lod_tgt_pool_add(&pool->pool_obds, tgt->ltd_index,
+       rc = tgt_pool_add(&pool->pool_obds, tgt->ltd_index,
                              lod->lod_ost_count);
        if (rc)
                GOTO(out, rc);
@@ -800,7 +623,7 @@ int lod_pool_remove(struct obd_device *obd, char *poolname, char *ostname)
        if (rc)
                GOTO(out, rc);
 
-       lod_tgt_pool_remove(&pool->pool_obds, ost->ltd_index);
+       tgt_pool_remove(&pool->pool_obds, ost->ltd_index);
        pool->pool_rr.lqr_dirty = 1;
 
        CDEBUG(D_CONFIG, "%s removed from "LOV_POOLNAMEF"\n", ostname,
@@ -827,23 +650,10 @@ out:
  */
 int lod_check_index_in_pool(__u32 idx, struct pool_desc *pool)
 {
-       unsigned int i;
        int rc;
-       ENTRY;
 
        pool_getref(pool);
-
-       down_read(&pool_tgt_rw_sem(pool));
-
-       for (i = 0; i < pool_tgt_count(pool); i++) {
-               if (pool_tgt_array(pool)[i] == idx)
-                       GOTO(out, rc = 0);
-       }
-       rc = -ENOENT;
-       EXIT;
-out:
-       up_read(&pool_tgt_rw_sem(pool));
-
+       rc = tgt_check_index(idx, &pool->pool_obds);
        lod_pool_putref(pool);
        return rc;
 }
index 65d0af8..69221ce 100644 (file)
@@ -280,7 +280,7 @@ static int lod_qos_calc_rr(struct lod_device *lod, struct lu_tgt_descs *ltd,
           deleting from the pool. The lq_rw_sem insures that nobody else
           is reading. */
        lqr->lqr_pool.op_count = real_count;
-       rc = lod_tgt_pool_extend(&lqr->lqr_pool, real_count);
+       rc = tgt_pool_extend(&lqr->lqr_pool, real_count);
        if (rc) {
                up_write(&ltd->ltd_qos.lq_rw_sem);
                RETURN(rc);
index 4ea3302..233be24 100644 (file)
@@ -2044,34 +2044,51 @@ static int mdc_ioc_hsm_ct_start(struct obd_export *exp,
 static int mdc_quotactl(struct obd_device *unused, struct obd_export *exp,
                         struct obd_quotactl *oqctl)
 {
-       struct ptlrpc_request   *req;
-       struct obd_quotactl     *oqc;
-       int                      rc;
+       struct ptlrpc_request *req;
+       struct obd_quotactl *oqc;
+       int rc;
        ENTRY;
 
-       req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp),
-                                       &RQF_MDS_QUOTACTL, LUSTRE_MDS_VERSION,
-                                       MDS_QUOTACTL);
+       req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_MDS_QUOTACTL);
        if (req == NULL)
                RETURN(-ENOMEM);
 
+
+       if (LUSTRE_Q_CMD_IS_POOL(oqctl->qc_cmd))
+               req_capsule_set_size(&req->rq_pill,
+                                    &RMF_OBD_QUOTACTL,
+                                    RCL_CLIENT,
+                                    sizeof(*oqc) + LOV_MAXPOOLNAME + 1);
+
+       rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION,
+                                MDS_QUOTACTL);
+       if (rc) {
+               ptlrpc_request_free(req);
+               RETURN(rc);
+       }
+
        oqc = req_capsule_client_get(&req->rq_pill, &RMF_OBD_QUOTACTL);
-       *oqc = *oqctl;
+       QCTL_COPY(oqc, oqctl);
 
        ptlrpc_request_set_replen(req);
        ptlrpc_at_set_req_timeout(req);
 
        rc = ptlrpc_queue_wait(req);
-       if (rc)
-               CERROR("ptlrpc_queue_wait failed, rc: %d\n", rc);
+       if (rc) {
+               CERROR("%s: ptlrpc_queue_wait failed: rc = %d\n",
+                      exp->exp_obd->obd_name, rc);
+               GOTO(out, rc);
+       }
 
        if (req->rq_repmsg &&
            (oqc = req_capsule_server_get(&req->rq_pill, &RMF_OBD_QUOTACTL))) {
-               *oqctl = *oqc;
+               QCTL_COPY(oqctl, oqc);
        } else if (!rc) {
-               CERROR ("Can't unpack obd_quotactl\n");
                rc = -EPROTO;
+               CERROR("%s: cannot unpack obd_quotactl: rc = %d\n",
+                       exp->exp_obd->obd_name, rc);
        }
+out:
        ptlrpc_req_finished(req);
 
        RETURN(rc);
index 01a3fd2..aa843fe 100644 (file)
@@ -2707,17 +2707,17 @@ put:
  */
 static int mdt_quotactl(struct tgt_session_info *tsi)
 {
-       struct obd_export       *exp  = tsi->tsi_exp;
-       struct req_capsule      *pill = tsi->tsi_pill;
-       struct obd_quotactl     *oqctl, *repoqc;
-       int                      id, rc;
-       struct mdt_device       *mdt = mdt_exp2dev(exp);
-       struct lu_device        *qmt = mdt->mdt_qmt_dev;
-       struct lu_nodemap       *nodemap;
+       struct obd_export *exp  = tsi->tsi_exp;
+       struct req_capsule *pill = tsi->tsi_pill;
+       struct obd_quotactl *oqctl, *repoqc;
+       int id, rc;
+       struct mdt_device *mdt = mdt_exp2dev(exp);
+       struct lu_device *qmt = mdt->mdt_qmt_dev;
+       struct lu_nodemap *nodemap;
        ENTRY;
 
        oqctl = req_capsule_client_get(pill, &RMF_OBD_QUOTACTL);
-       if (oqctl == NULL)
+       if (!oqctl)
                RETURN(err_serious(-EPROTO));
 
        rc = req_capsule_server_pack(pill);
@@ -2733,12 +2733,16 @@ static int mdt_quotactl(struct tgt_session_info *tsi)
        case Q_SETINFO:
        case Q_SETQUOTA:
        case LUSTRE_Q_SETDEFAULT:
+       case LUSTRE_Q_SETQUOTAPOOL:
+       case LUSTRE_Q_SETINFOPOOL:
                if (!nodemap_can_setquota(nodemap))
                        GOTO(out_nodemap, rc = -EPERM);
                /* fallthrough */
        case Q_GETINFO:
        case Q_GETQUOTA:
        case LUSTRE_Q_GETDEFAULT:
+       case LUSTRE_Q_GETQUOTAPOOL:
+       case LUSTRE_Q_GETINFOPOOL:
                if (qmt == NULL)
                        GOTO(out_nodemap, rc = -EOPNOTSUPP);
                /* slave quotactl */
@@ -2747,8 +2751,10 @@ static int mdt_quotactl(struct tgt_session_info *tsi)
        case Q_GETOQUOTA:
                break;
        default:
-               CERROR("Unsupported quotactl command: %d\n", oqctl->qc_cmd);
-               GOTO(out_nodemap, rc = -EFAULT);
+               rc = -EFAULT;
+               CERROR("%s: unsupported quotactl command %d: rc = %d\n",
+                      mdt_obd_name(mdt), oqctl->qc_cmd, rc);
+               GOTO(out_nodemap, rc);
        }
 
        id = oqctl->qc_id;
@@ -2791,6 +2797,10 @@ static int mdt_quotactl(struct tgt_session_info *tsi)
        case Q_GETQUOTA:
        case LUSTRE_Q_SETDEFAULT:
        case LUSTRE_Q_GETDEFAULT:
+       case LUSTRE_Q_SETQUOTAPOOL:
+       case LUSTRE_Q_GETQUOTAPOOL:
+       case LUSTRE_Q_SETINFOPOOL:
+       case LUSTRE_Q_GETINFOPOOL:
                /* forward quotactl request to QMT */
                rc = qmt_hdls.qmth_quotactl(tsi->tsi_env, qmt, oqctl);
                break;
@@ -2810,8 +2820,7 @@ static int mdt_quotactl(struct tgt_session_info *tsi)
        if (oqctl->qc_id != id)
                swap(oqctl->qc_id, id);
 
-       *repoqc = *oqctl;
-
+       QCTL_COPY(repoqc, oqctl);
        EXIT;
 
 out_nodemap:
index 5ee07ed..bd59d56 100644 (file)
@@ -665,6 +665,7 @@ struct obd_device *class_num2obd(int num)
 
         return obd;
 }
+EXPORT_SYMBOL(class_num2obd);
 
 /**
  * Find obd in obd_dev[] by name or uuid.
index 29986fb..aef076f 100644 (file)
@@ -1240,6 +1240,20 @@ void lustre_register_quota_process_config(int (*qpc)(struct lustre_cfg *lcfg))
 EXPORT_SYMBOL(lustre_register_quota_process_config);
 #endif /* HAVE_SERVER_SUPPORT */
 
+#define QMT0_DEV_NAME_LEN (LUSTRE_MAXFSNAME + sizeof("-QMT0000"))
+static struct obd_device *obd_find_qmt0(char *obd_name)
+{
+       char qmt_name[QMT0_DEV_NAME_LEN];
+       struct obd_device *qmt = NULL;
+
+       if (!server_name2fsname(obd_name, qmt_name, NULL)) {
+               strlcat(qmt_name, "-QMT0000", QMT0_DEV_NAME_LEN);
+               qmt = class_name2obd(qmt_name);
+       }
+
+       return qmt;
+}
+
 /**
  * Process configuration commands given in lustre_cfg form.
  * These may come from direct calls (e.g. class_manual_cleanup)
@@ -1421,20 +1435,42 @@ int class_process_config(struct lustre_cfg *lcfg)
        }
        case LCFG_POOL_NEW: {
                err = obd_pool_new(obd, lustre_cfg_string(lcfg, 2));
+               if (!err && !strcmp(obd->obd_type->typ_name, LUSTRE_LOD_NAME)) {
+                       obd = obd_find_qmt0(obd->obd_name);
+                       if (obd)
+                               obd_pool_new(obd, lustre_cfg_string(lcfg, 2));
+               }
                GOTO(out, err = 0);
        }
        case LCFG_POOL_ADD: {
                err = obd_pool_add(obd, lustre_cfg_string(lcfg, 2),
                                    lustre_cfg_string(lcfg, 3));
+               if (!err && !strcmp(obd->obd_type->typ_name, LUSTRE_LOD_NAME)) {
+                       obd = obd_find_qmt0(obd->obd_name);
+                       if (obd)
+                               obd_pool_add(obd, lustre_cfg_string(lcfg, 2),
+                                            lustre_cfg_string(lcfg, 3));
+               }
                GOTO(out, err = 0);
        }
        case LCFG_POOL_REM: {
                err = obd_pool_rem(obd, lustre_cfg_string(lcfg, 2),
                                    lustre_cfg_string(lcfg, 3));
+               if (!err && !strcmp(obd->obd_type->typ_name, LUSTRE_LOD_NAME)) {
+                       obd = obd_find_qmt0(obd->obd_name);
+                       if (obd)
+                               obd_pool_rem(obd, lustre_cfg_string(lcfg, 2),
+                                            lustre_cfg_string(lcfg, 3));
+               }
                GOTO(out, err = 0);
        }
        case LCFG_POOL_DEL: {
                err = obd_pool_del(obd, lustre_cfg_string(lcfg, 2));
+               if (!err && !strcmp(obd->obd_type->typ_name, LUSTRE_LOD_NAME)) {
+                       obd = obd_find_qmt0(obd->obd_name);
+                       if (obd)
+                               obd_pool_del(obd, lustre_cfg_string(lcfg, 2));
+               }
                GOTO(out, err = 0);
        }
        /*
index 0c68d2f..cd9c7d4 100644 (file)
@@ -696,6 +696,7 @@ int lustre_put_lsi(struct super_block *sb)
        }
        RETURN(0);
 }
+EXPORT_SYMBOL(lustre_put_lsi);
 
 /*
  * The goal of this function is to extract the file system name
index 3cb1fb8..31e956f 100644 (file)
@@ -15,6 +15,7 @@ target_objs += $(TARGET)tgt_handler.o $(TARGET)out_handler.o
 target_objs += $(TARGET)out_lib.o $(TARGET)update_trans.o
 target_objs += $(TARGET)update_records.o $(TARGET)update_recovery.o
 target_objs += $(TARGET)tgt_grant.o $(TARGET)tgt_fmd.o
+target_objs += $(TARGET)tgt_pool.o
 
 ptlrpc_objs := client.o recover.o connection.o niobuf.o pack_generic.o
 ptlrpc_objs += events.o ptlrpc_module.o service.o pinger.o
index 67a4884..cb8a2db 100644 (file)
@@ -100,8 +100,8 @@ static const struct req_msg_field *mdt_body_capa[] = {
 };
 
 static const struct req_msg_field *quotactl_only[] = {
-        &RMF_PTLRPC_BODY,
-        &RMF_OBD_QUOTACTL
+       &RMF_PTLRPC_BODY,
+       &RMF_OBD_QUOTACTL
 };
 
 static const struct req_msg_field *quota_body_only[] = {
@@ -995,9 +995,10 @@ struct req_msg_field RMF_MDT_BODY =
 EXPORT_SYMBOL(RMF_MDT_BODY);
 
 struct req_msg_field RMF_OBD_QUOTACTL =
-        DEFINE_MSGF("obd_quotactl", 0,
-                    sizeof(struct obd_quotactl),
-                    lustre_swab_obd_quotactl, NULL);
+       DEFINE_MSGFL("obd_quotactl",
+                    0,
+                    sizeof(struct obd_quotactl),
+                    lustre_swab_obd_quotactl, NULL);
 EXPORT_SYMBOL(RMF_OBD_QUOTACTL);
 
 struct req_msg_field RMF_QUOTA_BODY =
index 087c491..8cc0ff6 100644 (file)
@@ -2073,14 +2073,19 @@ static void lustre_swab_obd_dqblk(struct obd_dqblk *b)
        BUILD_BUG_ON(offsetof(typeof(*b), dqb_padding) == 0);
 }
 
-void lustre_swab_obd_quotactl(struct obd_quotactl *q)
+int lustre_swab_obd_quotactl(struct obd_quotactl *q, __u32 len)
 {
+       if (unlikely(len <= sizeof(struct obd_quotactl)))
+               return -EOVERFLOW;
+
        __swab32s(&q->qc_cmd);
        __swab32s(&q->qc_type);
        __swab32s(&q->qc_id);
        __swab32s(&q->qc_stat);
        lustre_swab_obd_dqinfo(&q->qc_dqinfo);
        lustre_swab_obd_dqblk(&q->qc_dqblk);
+
+       return len;
 }
 
 void lustre_swab_fid2path(struct getinfo_fid2path *gf)
index 98acb04..4353f1a 100644 (file)
@@ -541,7 +541,7 @@ int lquota_disk_for_each_slv(const struct lu_env *env, struct dt_object *parent,
                if (qti->qti_fid.f_seq != FID_SEQ_QUOTA)
                        goto next;
 
-               rc = func(env, glb_fid, (char *)key, &qti->qti_fid, arg);
+               rc = func(env, glb_fid, qti->qti_buf, &qti->qti_fid, arg);
                if (rc)
                        break;
 next:
index e94cd66..ae495d8 100644 (file)
@@ -285,8 +285,10 @@ static void lqe_init(struct lquota_entry *lqe)
  *
  * \param env - the environment passed by the caller
  * \param lqe - is the quota entry to refresh
+ * \param find - don't create entry on disk if true
  */
-static int lqe_read(const struct lu_env *env, struct lquota_entry *lqe)
+static int lqe_read(const struct lu_env *env,
+                   struct lquota_entry *lqe, bool find)
 {
        struct lquota_site      *site;
        int                      rc;
@@ -299,7 +301,7 @@ static int lqe_read(const struct lu_env *env, struct lquota_entry *lqe)
 
        LQUOTA_DEBUG(lqe, "read");
 
-       rc = site->lqs_ops->lqe_read(env, lqe, site->lqs_parent);
+       rc = site->lqs_ops->lqe_read(env, lqe, site->lqs_parent, find);
        if (rc == 0)
                /* mark the entry as up-to-date */
                lqe->lqe_uptodate = true;
@@ -313,12 +315,15 @@ static int lqe_read(const struct lu_env *env, struct lquota_entry *lqe)
  * \param env  - the environment passed by the caller
  * \param site - lquota site which stores quota entries in a hash table
  * \param qid  - is the quota ID to be found/created
+ * \param find - don't create lqe on disk in case of ENOENT if true
  *
  * \retval 0     - success
  * \retval -ve   - failure
  */
-struct lquota_entry *lqe_locate(const struct lu_env *env,
-                               struct lquota_site *site, union lquota_id *qid)
+struct lquota_entry *lqe_locate_find(const struct lu_env *env,
+                                    struct lquota_site *site,
+                                    union lquota_id *qid,
+                                    bool find)
 {
        struct lquota_entry     *lqe, *new = NULL;
        int                      rc = 0;
@@ -350,7 +355,7 @@ struct lquota_entry *lqe_locate(const struct lu_env *env,
        lqe_init(new);
 
        /* read quota settings from disk and mark lqe as up-to-date */
-       rc = lqe_read(env, new);
+       rc = lqe_read(env, new, find);
        if (rc)
                GOTO(out, lqe = ERR_PTR(rc));
 
index 08f18aa..f13c1dc 100644 (file)
@@ -76,7 +76,7 @@ struct lquota_entry_operations {
 
        /* Read quota settings from disk and update lquota entry */
        int (*lqe_read)(const struct lu_env *, struct lquota_entry *,
-                       void *arg);
+                       void *arg, bool find);
 
        /* Print debug information about a given lquota entry */
        void (*lqe_debug)(struct lquota_entry *, void *,
@@ -171,6 +171,8 @@ struct lquota_entry {
 
        /* current quota settings/usage of this ID */
        __u64           lqe_granted; /* granted limit, inodes or kbytes */
+       /* used in quota pool recalc process (only on QMT) */
+       __u64           lqe_recalc_granted;
        __u64           lqe_qunit; /* [ib]unit size, inodes or kbytes */
        union {
                struct  lquota_mst_entry me; /* params specific to QMT */
@@ -183,7 +185,32 @@ struct lquota_entry {
                        lqe_edquot:1,     /* id out of quota space on QMT */
                        lqe_gl:1,         /* glimpse is in progress */
                        lqe_nopreacq:1,   /* pre-acquire disabled */
-                       lqe_is_default:1; /* the default quota is used */
+                       lqe_is_default:1, /* the default quota is used */
+                       lqe_is_global:1;  /* lqe belongs to global pool "0x0"*/
+
+       struct lqe_glbl_data    *lqe_glbl_data;
+};
+
+#define lqe_qtype(lqe)         (lqe->lqe_site->lqs_qtype)
+#define lqe_rtype(lqe)         (lqe2qpi(lqe)->qpi_rtype)
+
+struct lqe_glbl_entry {
+       __u64                    lge_qunit;
+       unsigned long            lge_edquot:1,
+                                /* true when minimum qunit is set */
+                                lge_qunit_set:1,
+                                /* qunit or edquot is changed - need
+                                * to send glimpse to appropriate slave */
+                                lge_qunit_nu:1,
+                                lge_edquot_nu:1;
+};
+
+struct lqe_glbl_data {
+       struct lqe_glbl_entry   *lqeg_arr;
+       /* number of initialised entries */
+       int                      lqeg_num_used;
+       /* number of allocated entries */
+       int                      lqeg_num_alloc;
 };
 
 /* Compartment within which lquota_entry are unique.
@@ -370,11 +397,37 @@ void lquota_lqe_debug0(struct lquota_entry *lqe,
 #define LQUOTA_CONSOLE(lqe, fmt, a...) \
        LQUOTA_DEBUG_LIMIT(D_CONSOLE, lqe, fmt, ## a)
 
-#define LQUOTA_DEBUG(lock, fmt, a...) do {                                 \
+#define LQUOTA_ELEVEL_LQES(level, env, fmt, a...) do {         \
+       int i;                                                  \
+       for (i = 0; i < qti_lqes_cnt(env); i++) {               \
+               LQUOTA_##level(qti_lqes(env)[i], fmt, ##a);     \
+       }                                                       \
+} while (0)
+#define LQUOTA_WARN_LQES(lqe, fmt, a...) \
+               LQUOTA_ELEVEL_LQES(WARN, env, fmt, ##a)
+#define LQUOTA_CONSOLE_LQES(lqe, fmt, a...) \
+               LQUOTA_ELEVEL_LQES(CONSOLE, env, fmt, ##a)
+#define LQUOTA_ERROR_LQES(lqe, fmt, a...) \
+               LQUOTA_ELEVEL_LQES(ERROR, env, fmt, ##a)
+
+#define LQUOTA_DEBUG(lqe, fmt, a...) do {                                 \
        LIBCFS_DEBUG_MSG_DATA_DECL(msgdata, D_QUOTA, NULL);                \
        lquota_lqe_debug(&msgdata, D_QUOTA, NULL, lqe, "$$$ "fmt" ", ##a); \
 } while (0)
+
+#define LQUOTA_DEBUG_LQES(env, fmt, a...) do {                 \
+       int i;                                                  \
+       for (i = 0; i < qti_lqes_cnt(env); i++) {               \
+               LQUOTA_DEBUG(qti_lqes(env)[i], fmt, ##a);       \
+       }                                                       \
+} while (0)
+
+
 #else /* !LIBCFS_DEBUG */
+# define LQUOTA_DEBUG_LQES(lqe, fmt, a...) ((void)0)
+# define LQUOTA_ERROR_LQES(lqe, fmt, a...) ((void)0)
+# define LQUOTA_WARN_LQES(lqe, fmt, a...) ((void)0)
+# define LQUOTA_CONSOLE_LQES(lqe, fmt, a...) ((void)0)
 # define LQUOTA_DEBUG(lqe, fmt, a...) ((void)0)
 # define LQUOTA_ERROR(lqe, fmt, a...) ((void)0)
 # define LQUOTA_WARN(lqe, fmt, a...) ((void)0)
@@ -396,8 +449,11 @@ struct lquota_site *lquota_site_alloc(const struct lu_env *, void *, bool,
                                      short, struct lquota_entry_operations *);
 void lquota_site_free(const struct lu_env *, struct lquota_site *);
 /* quota entry operations */
-struct lquota_entry *lqe_locate(const struct lu_env *, struct lquota_site *,
-                               union lquota_id *);
+#define lqe_locate(env, site, id) lqe_locate_find(env, site, id, false)
+#define lqe_find(env, site, id) lqe_locate_find(env, site, id, true)
+struct lquota_entry *lqe_locate_find(const struct lu_env *,
+                                    struct lquota_site *,
+                                    union lquota_id *, bool);
 
 /* lquota_disk.c */
 struct dt_object *lquota_disk_dir_find_create(const struct lu_env *,
index cab5481..f785ac9 100644 (file)
@@ -110,6 +110,11 @@ static struct lu_device *qmt_device_fini(const struct lu_env *env,
        if (!qmt->qmt_child->dd_rdonly)
                qmt_stop_reba_thread(qmt);
 
+       if (qmt->qmt_root) {
+               dt_object_put(env, qmt->qmt_root);
+               qmt->qmt_root = NULL;
+       }
+
        /* disconnect from OSD */
        if (qmt->qmt_child_exp != NULL) {
                obd_disconnect(qmt->qmt_child_exp);
@@ -274,6 +279,7 @@ static int qmt_device_init0(const struct lu_env *env, struct qmt_device *qmt,
        rc = qmt_pool_init(env, qmt);
        if (rc)
                GOTO(out, rc);
+
        EXIT;
 out:
        if (rc)
@@ -421,6 +427,10 @@ static const struct obd_ops qmt_obd_ops = {
        .o_owner        = THIS_MODULE,
        .o_connect      = qmt_device_obd_connect,
        .o_disconnect   = qmt_device_obd_disconnect,
+       .o_pool_new     = qmt_pool_new,
+       .o_pool_rem     = qmt_pool_rem,
+       .o_pool_add     = qmt_pool_add,
+       .o_pool_del     = qmt_pool_del,
 };
 
 /*
@@ -453,10 +463,9 @@ static int qmt_device_prepare(const struct lu_env *env,
                RETURN(rc);
        }
 
+       qmt->qmt_root = qmt_root;
        /* initialize on-disk indexes associated with each pool */
-       rc = qmt_pool_prepare(env, qmt, qmt_root);
-
-       dt_object_put(env, qmt_root);
+       rc = qmt_pool_prepare(env, qmt, qmt_root, NULL);
        RETURN(rc);
 }
 
index 74ad463..de9a314 100644 (file)
@@ -68,7 +68,7 @@ int qmt_lqe_set_default(const struct lu_env *env, struct qmt_pool_info *pool,
        if (lqe->lqe_id.qid_uid == 0)
                RETURN(0);
 
-       lqe_def = pool->qpi_grace_lqe[lqe->lqe_site->lqs_qtype];
+       lqe_def = pool->qpi_grace_lqe[lqe_qtype(lqe)];
 
        LQUOTA_DEBUG(lqe, "inherit default quota");
 
@@ -102,9 +102,10 @@ int qmt_lqe_set_default(const struct lu_env *env, struct qmt_pool_info *pool,
  * \param env - the environment passed by the caller
  * \param lqe - is the quota entry to refresh
  * \param arg - is the pointer to the qmt_pool_info structure
+ * \param find - don't create lqe on disk in case of ENOENT if true
  */
 static int qmt_lqe_read(const struct lu_env *env, struct lquota_entry *lqe,
-                       void *arg)
+                       void *arg, bool find)
 {
        struct qmt_thread_info  *qti = qmt_info(env);
        struct qmt_pool_info    *pool = (struct qmt_pool_info *)arg;
@@ -119,6 +120,8 @@ static int qmt_lqe_read(const struct lu_env *env, struct lquota_entry *lqe,
 
        switch (rc) {
        case -ENOENT:
+               if (find)
+                       RETURN(-ENOENT);
                qmt_lqe_set_default(env, pool, lqe, true);
                break;
        case 0:
@@ -146,6 +149,9 @@ static int qmt_lqe_read(const struct lu_env *env, struct lquota_entry *lqe,
        else
                lqe->lqe_enforced  = true;
 
+       if (qmt_pool_global(pool))
+               lqe->lqe_is_global = 1;
+
        LQUOTA_DEBUG(lqe, "read");
        RETURN(0);
 }
@@ -192,28 +198,37 @@ struct lquota_entry_operations qmt_lqe_ops = {
  *
  * \param env     - is the environment passed by the caller
  * \param lqe     - is the quota entry associated with the identifier
- *                  subject to the change
+ *                  subject to the change. If it is NULL lqes array is
+ *                  taken from env with qti_lqes_env(env).
  * \param slv_obj - is the dt_object associated with the index file
- * \param restore - is a temporary storage for current quota settings which will
- *                  be restored if something goes wrong at index update time.
+ * \param sync    - make transaction sync if true
  */
 struct thandle *qmt_trans_start_with_slv(const struct lu_env *env,
                                         struct lquota_entry *lqe,
                                         struct dt_object *slv_obj,
-                                        struct qmt_lqe_restore *restore)
+                                        bool sync)
 {
        struct qmt_device       *qmt;
        struct thandle          *th;
-       int                      rc;
+       struct lquota_entry     **lqes;
+       struct qmt_lqe_restore  *restore;
+       int                      rc, i, lqes_cnt;
        ENTRY;
 
-       LASSERT(lqe != NULL);
-       LASSERT(lqe_is_master(lqe));
+       restore = qti_lqes_rstr(env);
+       if (!lqe) {
+               lqes_cnt = qti_lqes_cnt(env);
+               lqes = qti_lqes(env);
+       } else {
+               lqes_cnt = 1;
+               lqes = &lqe;
+       }
 
-       qmt = lqe2qpi(lqe)->qpi_qmt;
+       /* qmt is the same for all lqes, so take it from the 1st */
+       qmt = lqe2qpi(lqes[0])->qpi_qmt;
 
        if (slv_obj != NULL)
-               LQUOTA_DEBUG(lqe, "declare write for slv "DFID,
+               LQUOTA_DEBUG(lqes[0], "declare write for slv "DFID,
                             PFID(lu_object_fid(&slv_obj->do_lu)));
 
        /* start transaction */
@@ -221,15 +236,19 @@ struct thandle *qmt_trans_start_with_slv(const struct lu_env *env,
        if (IS_ERR(th))
                RETURN(th);
 
-       if (slv_obj == NULL)
+       if (sync)
                /* quota settings on master are updated synchronously for the
                 * time being */
                th->th_sync = 1;
 
        /* reserve credits for global index update */
-       rc = lquota_disk_declare_write(env, th, LQE_GLB_OBJ(lqe), &lqe->lqe_id);
-       if (rc)
-               GOTO(out, rc);
+       for (i = 0; i < lqes_cnt; i++) {
+               rc = lquota_disk_declare_write(env, th,
+                                              LQE_GLB_OBJ(lqes[i]),
+                                              &lqes[i]->lqe_id);
+               if (rc)
+                       GOTO(out, rc);
+       }
 
        if (slv_obj != NULL) {
                /* reserve credits for slave index update */
@@ -252,11 +271,13 @@ out:
                             ", rc:%d", PFID(lu_object_fid(&slv_obj->do_lu)),
                             rc);
        } else {
-               restore->qlr_hardlimit = lqe->lqe_hardlimit;
-               restore->qlr_softlimit = lqe->lqe_softlimit;
-               restore->qlr_gracetime = lqe->lqe_gracetime;
-               restore->qlr_granted   = lqe->lqe_granted;
-               restore->qlr_qunit     = lqe->lqe_qunit;
+               for (i = 0; i < lqes_cnt; i++) {
+                       restore[i].qlr_hardlimit = lqes[i]->lqe_hardlimit;
+                       restore[i].qlr_softlimit = lqes[i]->lqe_softlimit;
+                       restore[i].qlr_gracetime = lqes[i]->lqe_gracetime;
+                       restore[i].qlr_granted   = lqes[i]->lqe_granted;
+                       restore[i].qlr_qunit     = lqes[i]->lqe_qunit;
+               }
        }
        return th;
 }
@@ -270,11 +291,24 @@ out:
  *                  be restored if something goes wrong at index update time.
  */
 struct thandle *qmt_trans_start(const struct lu_env *env,
-                               struct lquota_entry *lqe,
-                               struct qmt_lqe_restore *restore)
+                               struct lquota_entry *lqe)
 {
        LQUOTA_DEBUG(lqe, "declare write");
-       return qmt_trans_start_with_slv(env, lqe, NULL, restore);
+       return qmt_trans_start_with_slv(env, lqe, NULL, true);
+}
+
+int qmt_glb_write_lqes(const struct lu_env *env, struct thandle *th,
+                      __u32 flags, __u64 *ver)
+{
+       int i, rc;
+       rc = 0;
+
+       for (i = 0; i < qti_lqes_cnt(env); i++) {
+               rc = qmt_glb_write(env, th, qti_lqes(env)[i], flags, ver);
+               if (rc)
+                       break;
+       }
+       return rc;
 }
 
 /*
@@ -345,7 +379,7 @@ int qmt_glb_write(const struct lu_env *env, struct thandle *th,
  * The entry must be at least read locked.
  *
  * \param env - the environment passed by the caller
- * \param lqe - is the quota entry associated with the identifier to look-up
+ * \param lqe_id - is the quota id associated with the identifier to look-up
  *              in the slave index
  * \param slv_obj - is the dt_object associated with the slave index
  * \param granted - is the output parameter where to return how much space
@@ -353,7 +387,7 @@ int qmt_glb_write(const struct lu_env *env, struct thandle *th,
  *
  * \retval    - 0 on success, appropriate error on failure
  */
-int qmt_slv_read(const struct lu_env *env, struct lquota_entry *lqe,
+int qmt_slv_read(const struct lu_env *env, union lquota_id *qid,
                 struct dt_object *slv_obj, __u64 *granted)
 {
        struct qmt_thread_info  *qti = qmt_info(env);
@@ -361,15 +395,11 @@ int qmt_slv_read(const struct lu_env *env, struct lquota_entry *lqe,
        int                      rc;
        ENTRY;
 
-       LASSERT(lqe != NULL);
-       LASSERT(lqe_is_master(lqe));
-       LASSERT(lqe_is_locked(lqe));
-
-       LQUOTA_DEBUG(lqe, "read slv "DFID,
-                    PFID(lu_object_fid(&slv_obj->do_lu)));
+       CDEBUG(D_QUOTA, "read id:%llu form slv "DFID"\n",
+              qid->qid_uid, PFID(lu_object_fid(&slv_obj->do_lu)));
 
        /* read slave record from disk */
-       rc = lquota_disk_read(env, slv_obj, &lqe->lqe_id,
+       rc = lquota_disk_read(env, slv_obj, qid,
                              (struct dt_rec *)slv_rec);
        switch (rc) {
        case -ENOENT:
@@ -380,12 +410,12 @@ int qmt_slv_read(const struct lu_env *env, struct lquota_entry *lqe,
                *granted = slv_rec->qsr_granted;
                break;
        default:
-               LQUOTA_ERROR(lqe, "failed to read slave record "DFID,
-                            PFID(lu_object_fid(&slv_obj->do_lu)));
+               CERROR("Failed to read slave record for %llu from "DFID"\n",
+                      qid->qid_uid, PFID(lu_object_fid(&slv_obj->do_lu)));
                RETURN(rc);
        }
 
-       LQUOTA_DEBUG(lqe, "successful slv read %llu", *granted);
+       CDEBUG(D_QUOTA, "Successful slv read %llu\n", *granted);
 
        RETURN(0);
 }
@@ -466,13 +496,13 @@ int qmt_validate_limits(struct lquota_entry *lqe, __u64 hard, __u64 soft)
  * \param lqe - is the quota entry to check
  * \param now - is the current time in second used for grace time managment
  */
-void qmt_adjust_edquot(struct lquota_entry *lqe, __u64 now)
+bool qmt_adjust_edquot(struct lquota_entry *lqe, __u64 now)
 {
        struct qmt_pool_info    *pool = lqe2qpi(lqe);
        ENTRY;
 
        if (!lqe->lqe_enforced || lqe->lqe_id.qid_uid == 0)
-               RETURN_EXIT;
+               RETURN(false);
 
        if (!lqe->lqe_edquot) {
                /* space exhausted flag not set, let's check whether it is time
@@ -480,7 +510,7 @@ void qmt_adjust_edquot(struct lquota_entry *lqe, __u64 now)
 
                if (!qmt_space_exhausted(lqe, now))
                        /* the qmt still has available space */
-                       RETURN_EXIT;
+                       RETURN(false);
 
                /* See comment in qmt_adjust_qunit(). LU-4139 */
                if (qmt_hard_exhausted(lqe) ||
@@ -491,19 +521,19 @@ void qmt_adjust_edquot(struct lquota_entry *lqe, __u64 now)
                         * still hope that the rebalancing process might free
                         * up some quota space */
                        if (lqe->lqe_qunit != pool->qpi_least_qunit)
-                               RETURN_EXIT;
+                               RETURN(false);
 
                        /* least qunit value not sent to all slaves yet */
                        if (lqe->lqe_revoke_time == 0)
-                               RETURN_EXIT;
+                               RETURN(false);
 
                        /* Let's give more time to slave to release space */
                        lapse = ktime_get_seconds() - QMT_REBA_TIMEOUT;
                        if (lqe->lqe_may_rel != 0 && lqe->lqe_revoke_time > lapse)
-                               RETURN_EXIT;
+                               RETURN(false);
                } else {
                        if (lqe->lqe_qunit > pool->qpi_soft_least_qunit)
-                               RETURN_EXIT;
+                               RETURN(false);
                }
 
                /* set edquot flag */
@@ -514,14 +544,14 @@ void qmt_adjust_edquot(struct lquota_entry *lqe, __u64 now)
 
                if (qmt_space_exhausted(lqe, now))
                        /* the qmt still has not space */
-                       RETURN_EXIT;
+                       RETURN(false);
 
                if (lqe->lqe_hardlimit != 0 &&
                    lqe->lqe_granted + pool->qpi_least_qunit >
                                                        lqe->lqe_hardlimit)
                        /* we clear the flag only once at least one least qunit
                         * is available */
-                       RETURN_EXIT;
+                       RETURN(false);
 
                /* clear edquot flag */
                lqe->lqe_edquot = false;
@@ -531,8 +561,7 @@ void qmt_adjust_edquot(struct lquota_entry *lqe, __u64 now)
 
        /* let's notify slave by issuing glimpse on per-ID lock.
         * the rebalance thread will take care of this */
-       qmt_id_lock_notify(pool->qpi_qmt, lqe);
-       EXIT;
+       RETURN(true);
 }
 
 /* Using least_qunit when over block softlimit will seriously impact the
@@ -578,8 +607,8 @@ __u64 qmt_alloc_expand(struct lquota_entry *lqe, __u64 granted, __u64 spare)
 
        LASSERT(lqe->lqe_enforced && lqe->lqe_qunit != 0);
 
-       slv_cnt = lqe2qpi(lqe)->qpi_slv_nr[lqe->lqe_site->lqs_qtype];
-       qunit   = lqe->lqe_qunit;
+       slv_cnt = qpi_slv_nr(lqe2qpi(lqe), lqe_qtype(lqe));
+       qunit = lqe->lqe_qunit;
 
        /* See comment in qmt_adjust_qunit(). LU-4139. */
        if (lqe->lqe_softlimit != 0) {
@@ -628,10 +657,12 @@ __u64 qmt_alloc_expand(struct lquota_entry *lqe, __u64 granted, __u64 spare)
  *
  * \param env - the environment passed by the caller
  * \param lqe - is the qid entry to be adjusted
+ * \retval true - need reseed glbe array
  */
-void qmt_adjust_qunit(const struct lu_env *env, struct lquota_entry *lqe)
+bool qmt_adjust_qunit(const struct lu_env *env, struct lquota_entry *lqe)
 {
        struct qmt_pool_info    *pool = lqe2qpi(lqe);
+       bool                     need_reseed = false;
        int                      slv_cnt;
        __u64                    qunit, limit, qunit2 = 0;
        ENTRY;
@@ -640,13 +671,20 @@ void qmt_adjust_qunit(const struct lu_env *env, struct lquota_entry *lqe)
 
        if (!lqe->lqe_enforced || lqe->lqe_id.qid_uid == 0)
                /* no quota limits */
-               RETURN_EXIT;
+               RETURN(need_reseed);
 
        /* record how many slaves have already registered */
-       slv_cnt = pool->qpi_slv_nr[lqe->lqe_site->lqs_qtype];
-       if (slv_cnt == 0)
+       slv_cnt = qpi_slv_nr(pool, lqe_qtype(lqe));
+       if (slv_cnt == 0) {
+               /* Pool hasn't slaves anymore. Qunit will be adjusted
+                * again when new slaves would be added. */
+               if (lqe->lqe_qunit) {
+                       qunit = 0;
+                       GOTO(done, qunit);
+               }
                /* wait for at least one slave to join */
-               RETURN_EXIT;
+               RETURN(need_reseed);
+       }
 
        /* Qunit calculation is based on soft limit, if any, hard limit
         * otherwise. This means that qunit is shrunk to the minimum when
@@ -667,7 +705,7 @@ void qmt_adjust_qunit(const struct lu_env *env, struct lquota_entry *lqe)
        } else {
                LQUOTA_ERROR(lqe, "enforced bit set, but neither hard nor soft "
                             "limit are set");
-               RETURN_EXIT;
+               RETURN(need_reseed);
        }
 
        qunit = lqe->lqe_qunit == 0 ? pool->qpi_least_qunit : lqe->lqe_qunit;
@@ -717,7 +755,7 @@ void qmt_adjust_qunit(const struct lu_env *env, struct lquota_entry *lqe)
 done:
        if (lqe->lqe_qunit == qunit)
                /* keep current qunit */
-               RETURN_EXIT;
+               RETURN(need_reseed);
 
        LQUOTA_DEBUG(lqe, "%s qunit to %llu",
                     lqe->lqe_qunit < qunit ? "increasing" : "decreasing",
@@ -726,29 +764,430 @@ done:
        /* store new qunit value */
        swap(lqe->lqe_qunit, qunit);
 
+       /* reseed glbe array and notify
+        * slave if qunit was shrinked */
+       need_reseed = true;
        /* reset revoke time */
        lqe->lqe_revoke_time = 0;
 
-       if (lqe->lqe_qunit < qunit)
-               /* let's notify slave of qunit shrinking */
-               qmt_id_lock_notify(pool->qpi_qmt, lqe);
-       else if (lqe->lqe_qunit == pool->qpi_least_qunit)
+       if (lqe->lqe_qunit >= qunit &&
+           (lqe->lqe_qunit == pool->qpi_least_qunit)) {
                /* initial qunit value is the smallest one */
                lqe->lqe_revoke_time = ktime_get_seconds();
-       EXIT;
+       }
+       RETURN(need_reseed);
 }
 
+bool qmt_adjust_edquot_qunit_notify(const struct lu_env *env,
+                                   struct qmt_device *qmt,
+                                   __u64 now, bool edquot,
+                                   bool qunit, __u32 qb_flags)
+{
+       struct lquota_entry *lqe_gl, *lqe;
+       bool need_reseed = false;
+       int i;
+
+       lqe_gl = qti_lqes_glbl(env);
+
+       for (i = 0; i < qti_lqes_cnt(env); i++) {
+               lqe = qti_lqes(env)[i];
+               if (qunit)
+                       need_reseed |= qmt_adjust_qunit(env, lqe);
+               if (edquot)
+                       need_reseed |= qmt_adjust_edquot(lqe, now);
+       }
+
+       LASSERT(lqe_gl);
+       if (!lqe_gl->lqe_glbl_data &&
+           (req_has_rep(qb_flags) || req_is_rel(qb_flags))) {
+               if (need_reseed)
+                       CWARN("%s: can't notify - lge_glbl_data is not set",
+                             qmt->qmt_svname);
+               return need_reseed;
+       }
+
+       if (lqe_gl->lqe_glbl_data && need_reseed) {
+               qmt_seed_glbe_all(env, lqe_gl->lqe_glbl_data, qunit, edquot);
+               qmt_id_lock_notify(qmt, lqe_gl);
+       }
+       return need_reseed;
+}
+
+
 /*
  * Adjust qunit & edquot flag in case it wasn't initialized already (e.g.
  * limit set while no slaves were connected yet)
  */
-void qmt_revalidate(const struct lu_env *env, struct lquota_entry *lqe)
+bool qmt_revalidate(const struct lu_env *env, struct lquota_entry *lqe)
 {
+       bool need_notify = false;
+
        if (lqe->lqe_qunit == 0) {
                /* lqe was read from disk, but neither qunit, nor edquot flag
                 * were initialized */
-               qmt_adjust_qunit(env, lqe);
+               need_notify = qmt_adjust_qunit(env, lqe);
                if (lqe->lqe_qunit != 0)
-                       qmt_adjust_edquot(lqe, ktime_get_real_seconds());
+                       need_notify |= qmt_adjust_edquot(lqe,
+                                               ktime_get_real_seconds());
+       }
+
+       return need_notify;
+}
+
+void qmt_revalidate_lqes(const struct lu_env *env,
+                        struct qmt_device *qmt, __u32 qb_flags)
+{
+       struct lquota_entry *lqe_gl = qti_lqes_glbl(env);
+       bool need_notify = false;
+       int i;
+
+       for (i = 0; i < qti_lqes_cnt(env); i++)
+               need_notify |= qmt_revalidate(env, qti_lqes(env)[i]);
+
+       /* There could be no ID lock to the moment of reconciliation.
+        * As a result lqe global data is not initialised yet. It is ok
+        * for release and report requests. */
+       if (!lqe_gl->lqe_glbl_data &&
+           (req_is_rel(qb_flags) || req_has_rep(qb_flags)))
+               return;
+
+       if (need_notify) {
+               qmt_seed_glbe(env, lqe_gl->lqe_glbl_data);
+               qmt_id_lock_notify(qmt, lqe_gl);
+       }
+}
+
+inline void qti_lqes_init(const struct lu_env *env)
+{
+       struct qmt_thread_info  *qti = qmt_info(env);
+
+       qti->qti_lqes_cnt = 0;
+       qti->qti_glbl_lqe_idx = 0;
+       qti->qti_lqes_num = QMT_MAX_POOL_NUM;
+}
+
+inline int qti_lqes_add(const struct lu_env *env, struct lquota_entry *lqe)
+{
+       struct qmt_thread_info  *qti = qmt_info(env);
+
+       if (qti->qti_lqes_cnt > qti->qti_lqes_num) {
+               struct lquota_entry     **lqes;
+               lqes = qti->qti_lqes;
+               OBD_ALLOC(lqes, sizeof(lqe) * qti->qti_lqes_num * 2);
+               if (!lqes)
+                       return -ENOMEM;
+               memcpy(lqes, qti_lqes(env), qti->qti_lqes_cnt * sizeof(lqe));
+               /* Don't need to free, if it is the very 1st allocation */
+               if (qti->qti_lqes_num > QMT_MAX_POOL_NUM)
+                       OBD_FREE(qti->qti_lqes,
+                                qti->qti_lqes_num * sizeof(lqe));
+               qti->qti_lqes = lqes;
+               qti->qti_lqes_num *= 2;
        }
+
+       if (lqe->lqe_is_global)
+               qti->qti_glbl_lqe_idx = qti->qti_lqes_cnt;
+       qti_lqes(env)[qti->qti_lqes_cnt++] = lqe;
+
+       /* The pool could be accessed directly from lqe, so take
+        * extra reference that is put in qti_lqes_fini */
+       qpi_getref(lqe2qpi(lqe));
+
+       CDEBUG(D_QUOTA, "LQE %p %lu is added, lqe_cnt %d lqes_num %d\n",
+                        lqe, (long unsigned)lqe->lqe_id.qid_uid,
+                        qti->qti_lqes_cnt, qti->qti_lqes_num);
+       LASSERT(qti->qti_lqes_num != 0);
+
+       return 0;
+}
+
+inline void qti_lqes_del(const struct lu_env *env, int index)
+{
+       struct lquota_entry     **lqes;
+       int lqes_cnt = qti_lqes_cnt(env);
+       int lqep_size = sizeof(struct lquota_entry *);
+
+       if (index == 0) {
+               /* We can't handle non global lqes correctly without
+                * global lqe located at index 0. If we try to do so,
+                * something goes wrong. */
+               LQUOTA_ERROR(qti_lqes_glbl(env),
+                            "quota: cannot remove lqe at index 0 as it is global");
+               LASSERT(qti_lqes_glbl(env)->lqe_is_global);
+               return;
+       }
+       lqes = qti_lqes(env);
+       qpi_putref(env, lqe2qpi(lqes[index]));
+       lqe_putref(lqes[index]);
+       memcpy((unsigned char *)lqes + index * lqep_size,
+              (unsigned char *)lqes + (index + 1) * lqep_size,
+              (lqes_cnt - index - 1) * lqep_size);
+       qti_lqes_cnt(env)--;
+}
+
+inline void qti_lqes_fini(const struct lu_env *env)
+{
+       struct qmt_thread_info  *qti = qmt_info(env);
+       struct lquota_entry     **lqes = qti->qti_lqes;
+       int i;
+
+       lqes = qti_lqes(env);
+       for (i = 0; i < qti->qti_lqes_cnt; i++) {
+               qpi_putref(env, lqe2qpi(lqes[i]));
+               lqe_putref(lqes[i]);
+       }
+
+       if (qti->qti_lqes_num > QMT_MAX_POOL_NUM)
+               OBD_FREE(qti->qti_lqes,
+                        qti->qti_lqes_num * sizeof(struct lquota_entry *));
+}
+
+inline int qti_lqes_min_qunit(const struct lu_env *env)
+{
+       int i, min, qunit;
+
+       for (i = 1, min = qti_lqe_qunit(env, 0); i < qti_lqes_cnt(env); i++) {
+               qunit = qti_lqe_qunit(env, i);
+               if (qunit < min)
+                       min = qunit;
+       }
+
+       return min;
+}
+
+inline int qti_lqes_edquot(const struct lu_env *env)
+{
+       int i;
+
+       for (i = 0; i < qti_lqes_cnt(env); i++) {
+               if (qti_lqes(env)[i]->lqe_edquot)
+                       return 1;
+       }
+
+       return 0;
+}
+
+inline int qti_lqes_restore_init(const struct lu_env *env)
+{
+       int rc = 0;
+
+       if (qti_lqes_cnt(env) > QMT_MAX_POOL_NUM) {
+               OBD_ALLOC(qmt_info(env)->qti_lqes_rstr,
+                         qti_lqes_cnt(env) * sizeof(struct qmt_lqe_restore));
+               if (!qti_lqes_rstr(env))
+                       rc = -ENOMEM;
+       }
+
+       return rc;
+}
+
+inline void qti_lqes_restore_fini(const struct lu_env *env)
+{
+       if (qti_lqes_cnt(env) > QMT_MAX_POOL_NUM)
+               OBD_FREE(qmt_info(env)->qti_lqes_rstr,
+                        qti_lqes_cnt(env) * sizeof(struct qmt_lqe_restore));
+}
+
+inline void qti_lqes_write_lock(const struct lu_env *env)
+{
+       int i;
+
+       for (i = 0; i < qti_lqes_cnt(env); i++)
+               lqe_write_lock(qti_lqes(env)[i]);
+}
+
+inline void qti_lqes_write_unlock(const struct lu_env *env)
+{
+       int i;
+
+       for (i = 0; i < qti_lqes_cnt(env); i++)
+               lqe_write_unlock(qti_lqes(env)[i]);
+}
+
+#define QMT_INIT_SLV_CNT       64
+struct lqe_glbl_data *qmt_alloc_lqe_gd(struct qmt_pool_info *pool, int qtype)
+{
+       struct lqe_glbl_data    *lgd;
+       struct lqe_glbl_entry   *lqeg_arr;
+       int                      slv_cnt, glbe_num;
+
+       OBD_ALLOC(lgd, sizeof(struct lqe_glbl_data));
+       if (!lgd)
+               RETURN(NULL);
+
+       slv_cnt = qpi_slv_nr_by_rtype(pool, qtype);
+
+       glbe_num = slv_cnt < QMT_INIT_SLV_CNT ? QMT_INIT_SLV_CNT : slv_cnt;
+       OBD_ALLOC(lqeg_arr, sizeof(struct lqe_glbl_entry) * glbe_num);
+       if (!lqeg_arr) {
+               OBD_FREE(lgd, sizeof(struct lqe_glbl_data));
+               RETURN(NULL);
+       }
+
+       CDEBUG(D_QUOTA, "slv_cnt %d glbe_num %d\n", slv_cnt, glbe_num);
+
+       lgd->lqeg_num_used = slv_cnt;
+       lgd->lqeg_num_alloc = glbe_num;
+       lgd->lqeg_arr = lqeg_arr;
+
+       RETURN(lgd);
+}
+
+void qmt_free_lqe_gd(struct lqe_glbl_data *lgd)
+{
+       OBD_FREE(lgd->lqeg_arr,
+                sizeof(struct lqe_glbl_entry) * lgd->lqeg_num_alloc);
+       OBD_FREE(lgd, sizeof(struct lqe_glbl_data));
+}
+
+void qmt_seed_glbe_all(const struct lu_env *env, struct lqe_glbl_data *lgd,
+                      bool qunit, bool edquot)
+{
+       struct rw_semaphore     *sem = NULL;
+       struct qmt_pool_info    *qpi;
+       int                      i, j, idx;
+       ENTRY;
+
+       /* lqes array is sorted by qunit - the first entry has minimum qunit.
+        * Thus start seeding global qunit's array beginning from the 1st lqe
+        * and appropriate pool. If pools overlapped, slaves from this
+        * overlapping get minimum qunit value.
+        * user1: pool1, pool2, pool_glbl;
+        * pool1: OST1; user1_qunit = 10M;
+        * pool2: OST0, OST1, OST2; user1_qunit = 30M;
+        * pool_glbl: OST0, OST1, OST2, OST3; user1_qunit = 160M;
+        * qunit array after seeding should be:
+        * OST0: 30M; OST1: 10M; OST2: 30M; OST3: 160M; */
+
+       /* edquot resetup algorythm works fine
+        * with not sorted lqes */
+       if (qunit)
+               qmt_lqes_sort(env);
+
+       for (i = 0; i < lgd->lqeg_num_used; i++) {
+               lgd->lqeg_arr[i].lge_qunit_set = 0;
+               lgd->lqeg_arr[i].lge_qunit_nu = 0;
+               lgd->lqeg_arr[i].lge_edquot_nu = 0;
+       }
+
+       for (i = 0; i < qti_lqes_cnt(env); i++) {
+               struct lquota_entry *lqe = qti_lqes(env)[i];
+               int slaves_cnt;
+
+               CDEBUG(D_QUOTA, "lqes_cnt %d, i %d\n", qti_lqes_cnt(env), i);
+               qpi = lqe2qpi(lqe);
+               if (qmt_pool_global(qpi)) {
+                       slaves_cnt = qpi_slv_nr_by_rtype(lqe2qpi(lqe),
+                                                        lqe_qtype(lqe));
+               } else {
+                       sem = qmt_sarr_rwsem(qpi);
+                       down_read(sem);
+                       slaves_cnt = qmt_sarr_count(qpi);
+               }
+
+               for (j = 0; j < slaves_cnt; j++) {
+                       idx = qmt_sarr_get_idx(qpi, j);
+                       LASSERT(idx >= 0);
+
+                       if (edquot) {
+                               int lge_edquot, new_edquot, edquot_nu;
+
+                               lge_edquot = lgd->lqeg_arr[idx].lge_edquot;
+                               edquot_nu = lgd->lqeg_arr[idx].lge_edquot_nu;
+                               new_edquot = lqe->lqe_edquot;
+
+                               if (lge_edquot == new_edquot ||
+                                   (edquot_nu && lge_edquot == 1))
+                                       goto qunit_lbl;
+                               lgd->lqeg_arr[idx].lge_edquot = new_edquot;
+                               /* it is needed for the following case:
+                                * initial values for idx i -
+                                * lqe_edquot = 1, lqe_edquot_nu == 0;
+                                * 1: new_edquot == 0 ->
+                                *      lqe_edquot = 0, lqe_edquot_nu = 1;
+                                * 2: new_edquot == 1 ->
+                                *      lqe_edquot = 1, lqe_edquot_nu = 0;
+                                * At the 2nd iteration lge_edquot comes back
+                                * to 1, so no changes and we don't need
+                                * to notify slave. */
+                               lgd->lqeg_arr[idx].lge_edquot_nu = !edquot_nu;
+                       }
+qunit_lbl:
+                       if (qunit) {
+                               __u64 lge_qunit, new_qunit;
+
+                               CDEBUG(D_QUOTA,
+                                      "idx %d lge_qunit_set %d lge_qunit %llu new_qunit %llu\n",
+                                      idx, lgd->lqeg_arr[idx].lge_qunit_set,
+                                      lgd->lqeg_arr[idx].lge_qunit,
+                                      lqe->lqe_qunit);
+                               /* lge for this idx is already set
+                                * on previous iteration */
+                               if (lgd->lqeg_arr[idx].lge_qunit_set)
+                                       continue;
+                               lge_qunit = lgd->lqeg_arr[idx].lge_qunit;
+                               new_qunit = lqe->lqe_qunit;
+                               /* qunit could be not set,
+                                * so use global lqe's qunit */
+                               if (!new_qunit)
+                                       continue;
+
+                               if (lge_qunit != new_qunit)
+                                       lgd->lqeg_arr[idx].lge_qunit =
+                                                               new_qunit;
+
+                               /* TODO: initially slaves notification was done
+                                * only for qunit shrinking. Should we always
+                                * notify slaves with new qunit ? */
+                               if (lge_qunit > new_qunit)
+                                       lgd->lqeg_arr[idx].lge_qunit_nu = 1;
+                               lgd->lqeg_arr[idx].lge_qunit_set = 1;
+                       }
+               }
+
+               if (!qmt_pool_global(qpi))
+                       up_read(sem);
+       }
+       /* TODO: only for debug purposes - remove it later */
+       for (i = 0; i < lgd->lqeg_num_used; i++)
+               CDEBUG(D_QUOTA,
+                       "lgd ost %d, qunit %lu nu %d;  edquot %d nu %d\n",
+                       i, (long unsigned)lgd->lqeg_arr[i].lge_qunit,
+                       lgd->lqeg_arr[i].lge_qunit_nu,
+                       lgd->lqeg_arr[i].lge_edquot,
+                       lgd->lqeg_arr[i].lge_edquot_nu);
+
+       EXIT;
+}
+
+void qmt_setup_lqe_gd(const struct lu_env *env, struct qmt_device *qmt,
+                     struct lquota_entry *lqe, struct lqe_glbl_data *lgd,
+                     int pool_type)
+{
+       __u64                    qunit;
+       bool                     edquot;
+       int                      i;
+
+       qunit = lqe->lqe_qunit;
+       edquot = lqe->lqe_edquot;
+
+       /* Firstly set all elements in array with
+        * qunit and edquot of global pool */
+       for (i = 0; i < lgd->lqeg_num_used; i++) {
+               lgd->lqeg_arr[i].lge_qunit = qunit;
+               lgd->lqeg_arr[i].lge_edquot = edquot;
+               /* It is the very first lvb setup - qunit and other flags
+                * will be sent to slaves during qmt_lvbo_fill. */
+               lgd->lqeg_arr[i].lge_qunit_nu = 0;
+               lgd->lqeg_arr[i].lge_edquot_nu = 0;
+       }
+
+       qmt_pool_lqes_lookup_spec(env, qmt, pool_type,
+                                 lqe_qtype(lqe), &lqe->lqe_id);
+       qmt_seed_glbe(env, lgd);
+
+       lqe->lqe_glbl_data = lgd;
+       qmt_id_lock_notify(qmt, lqe);
+
+       qti_lqes_fini(env);
 }
index d63f240..3b7b6cf 100644 (file)
  */
 static int qmt_get(const struct lu_env *env, struct qmt_device *qmt,
                   __u8 restype, __u8 qtype, union lquota_id *id,
-                  __u64 *hard, __u64 *soft, __u64 *time, bool is_default)
+                  __u64 *hard, __u64 *soft, __u64 *time, bool is_default,
+                  char *pool_name)
 {
        struct lquota_entry     *lqe;
        ENTRY;
 
        LASSERT(!is_default || id->qid_uid == 0);
+       if (pool_name && !strnlen(pool_name, LOV_MAXPOOLNAME))
+               pool_name = NULL;
 
        /* look-up lqe structure containing quota settings */
-       lqe = qmt_pool_lqe_lookup(env, qmt, restype, qtype, id);
+       lqe = qmt_pool_lqe_lookup(env, qmt, restype, qtype, id, pool_name);
        if (IS_ERR(lqe))
                RETURN(PTR_ERR(lqe));
 
@@ -101,6 +104,34 @@ static int qmt_entry_iter_cb(struct cfs_hash *hs, struct cfs_hash_bd *bd,
                                true, true);
 }
 
+static void qmt_set_id_notify(const struct lu_env *env, struct qmt_device *qmt,
+                             struct lquota_entry *lqe)
+{
+       struct lquota_entry *lqe_gl;
+       int rc;
+
+       lqe_gl = lqe->lqe_is_global ? lqe : NULL;
+       rc = qmt_pool_lqes_lookup_spec(env, qmt, lqe_rtype(lqe),
+                                      lqe_qtype(lqe), &lqe->lqe_id);
+       if (!qti_lqes_cnt(env))
+               GOTO(lqes_fini, rc);
+
+       if (!lqe_gl && qti_lqes_glbl(env)->lqe_is_global)
+               lqe_gl = qti_lqes_glbl(env);
+
+       if (!lqe_gl)
+               GOTO(lqes_fini, rc);
+
+       if (lqe_gl->lqe_glbl_data)
+               qmt_seed_glbe(env, lqe_gl->lqe_glbl_data);
+       /* Even if slaves haven't enqueued quota lock yet,
+        * it is needed to set lqe_revoke_time in qmt_id_lock_glimpse
+        * in case of reaching qpi_least_qunit */
+       qmt_id_lock_notify(qmt, lqe_gl);
+lqes_fini:
+       qti_lqes_fini(env);
+}
+
 /*
  * Update quota settings for a given lqe.
  *
@@ -120,19 +151,23 @@ int qmt_set_with_lqe(const struct lu_env *env, struct qmt_device *qmt,
                     struct lquota_entry *lqe, __u64 hard, __u64 soft,
                     __u64 time, __u32 valid, bool is_default, bool is_updated)
 {
-       struct qmt_thread_info  *qti = qmt_info(env);
-       struct thandle          *th = NULL;
-       time64_t now;
-       __u64                    ver;
-       bool                     dirtied = false;
-       int                      rc = 0;
+       struct thandle *th = NULL;
+       time64_t now = 0;
+       __u64 ver;
+       bool dirtied = false;
+       int rc = 0;
+       int need_id_notify = 0;
        ENTRY;
 
        /* need to write back to global quota file? */
        if (!is_updated) {
+               /* By default we should have here only 1 lqe,
+                * so no allocations should be done. */
+               if (qti_lqes_restore_init(env))
+                       GOTO(out_nolock, rc = -ENOMEM);
                /* allocate & start transaction with enough credits to update
                 * quota  settings in the global index file */
-               th = qmt_trans_start(env, lqe, &qti->qti_restore);
+               th = qmt_trans_start(env, lqe);
                if (IS_ERR(th))
                        GOTO(out_nolock, rc = PTR_ERR(th));
        }
@@ -170,7 +205,8 @@ int qmt_set_with_lqe(const struct lu_env *env, struct qmt_device *qmt,
 
 quota_set:
                /* recompute qunit in case it was never initialized */
-               qmt_revalidate(env, lqe);
+               if (qmt_revalidate(env, lqe))
+                       need_id_notify = 1;
 
                /* clear grace time */
                if (lqe->lqe_softlimit == 0 ||
@@ -207,7 +243,7 @@ quota_set:
                        rc = qmt_glb_write(env, th, lqe, LQUOTA_BUMP_VER, &ver);
                        if (rc) {
                                /* restore initial quota settings */
-                               qmt_restore(lqe, &qti->qti_restore);
+                               qmt_restore(lqe, &qti_lqes_rstr(env)[0]);
                                GOTO(out, rc);
                        }
                } else {
@@ -215,17 +251,16 @@ quota_set:
                }
 
                /* compute new qunit value now that we have modified the quota
-                * settings */
-               qmt_adjust_qunit(env, lqe);
-
-               /* clear/set edquot flag as needed */
-               qmt_adjust_edquot(lqe, now);
+                * settings or clear/set edquot flag if needed */
+               if (qmt_adjust_qunit(env, lqe) || qmt_adjust_edquot(lqe, now))
+                       need_id_notify |= 1;
        }
        EXIT;
 out:
        lqe_write_unlock(lqe);
 
 out_nolock:
+       qti_lqes_restore_fini(env);
        if (th != NULL && !IS_ERR(th))
                dt_trans_stop(env, qmt->qmt_child, th);
 
@@ -239,7 +274,13 @@ out_nolock:
                        iter_data.qeid_qmt = qmt;
                        cfs_hash_for_each_safe(lqe->lqe_site->lqs_hash,
                                               qmt_entry_iter_cb, &iter_data);
+                       /* Always notify slaves with default values. Don't
+                        * care about overhead as will be sent only not changed
+                        * values(see qmt_id_lock_cb for details).*/
+                       need_id_notify = 1;
                }
+               if (need_id_notify && !is_updated)
+                       qmt_set_id_notify(env, qmt, lqe);
        }
 
        return rc;
@@ -265,20 +306,22 @@ out_nolock:
 static int qmt_set(const struct lu_env *env, struct qmt_device *qmt,
                   __u8 restype, __u8 qtype, union lquota_id *id,
                   __u64 hard, __u64 soft, __u64 time, __u32 valid,
-                  bool is_default, bool is_updated)
+                  bool is_default, bool is_updated, char *pool_name)
 {
        struct lquota_entry *lqe;
        int rc;
        ENTRY;
 
+       if (pool_name && !strnlen(pool_name, LOV_MAXPOOLNAME))
+               pool_name = NULL;
+
        /* look-up quota entry associated with this ID */
-       lqe = qmt_pool_lqe_lookup(env, qmt, restype, qtype, id);
+       lqe = qmt_pool_lqe_lookup(env, qmt, restype, qtype, id, pool_name);
        if (IS_ERR(lqe))
                        RETURN(PTR_ERR(lqe));
 
        rc = qmt_set_with_lqe(env, qmt, lqe, hard, soft, time, valid,
                              is_default, is_updated);
-
        lqe_putref(lqe);
        RETURN(rc);
 }
@@ -293,12 +336,13 @@ static int qmt_set(const struct lu_env *env, struct qmt_device *qmt,
 static int qmt_quotactl(const struct lu_env *env, struct lu_device *ld,
                        struct obd_quotactl *oqctl)
 {
-       struct qmt_thread_info  *qti = qmt_info(env);
-       union lquota_id         *id  = &qti->qti_id;
-       struct qmt_device       *qmt = lu2qmt_dev(ld);
-       struct obd_dqblk        *dqb = &oqctl->qc_dqblk;
-       int                      rc = 0;
-       bool                     is_default = false;
+       struct qmt_thread_info *qti = qmt_info(env);
+       union lquota_id *id  = &qti->qti_id;
+       struct qmt_device *qmt = lu2qmt_dev(ld);
+       struct obd_dqblk *dqb = &oqctl->qc_dqblk;
+       char *poolname;
+       int rc = 0;
+       bool is_default = false;
        ENTRY;
 
        LASSERT(qmt != NULL);
@@ -307,24 +351,32 @@ static int qmt_quotactl(const struct lu_env *env, struct lu_device *ld,
                /* invalid quota type */
                RETURN(-EINVAL);
 
+       poolname = LUSTRE_Q_CMD_IS_POOL(oqctl->qc_cmd) ?
+                       oqctl->qc_poolname : NULL;
+
        switch (oqctl->qc_cmd) {
 
        case Q_GETINFO:  /* read grace times */
+       case LUSTRE_Q_GETINFOPOOL:
                /* Global grace time is stored in quota settings of ID 0. */
                id->qid_uid = 0;
 
                /* read inode grace time */
-               rc = qmt_get(env, qmt, LQUOTA_RES_MD, oqctl->qc_type, id,
-                            NULL, NULL, &oqctl->qc_dqinfo.dqi_igrace, false);
-               if (rc)
+               rc = qmt_get(env, qmt, LQUOTA_RES_MD, oqctl->qc_type, id, NULL,
+                            NULL, &oqctl->qc_dqinfo.dqi_igrace,
+                            false, poolname);
+               /* There could be no MD pool, so try to find DT pool */
+               if (rc && rc != -ENOENT)
                        break;
 
                /* read block grace time */
-               rc = qmt_get(env, qmt, LQUOTA_RES_DT, oqctl->qc_type, id,
-                            NULL, NULL, &oqctl->qc_dqinfo.dqi_bgrace, false);
+               rc = qmt_get(env, qmt, LQUOTA_RES_DT, oqctl->qc_type, id, NULL,
+                            NULL, &oqctl->qc_dqinfo.dqi_bgrace,
+                            false, poolname);
                break;
 
        case Q_SETINFO:  /* modify grace times */
+       case LUSTRE_Q_SETINFOPOOL:
                /* setinfo should be using dqi->dqi_valid, but lfs incorrectly
                 * sets the valid flags in dqb->dqb_valid instead, try to live
                 * with that ... */
@@ -336,7 +388,8 @@ static int qmt_quotactl(const struct lu_env *env, struct lu_device *ld,
                        /* set inode grace time */
                        rc = qmt_set(env, qmt, LQUOTA_RES_MD, oqctl->qc_type,
                                     id, 0, 0, oqctl->qc_dqinfo.dqi_igrace,
-                                    QIF_TIMES, false, false);
+                                    QIF_TIMES, false, false,
+                                    poolname);
                        if (rc)
                                break;
                }
@@ -345,7 +398,8 @@ static int qmt_quotactl(const struct lu_env *env, struct lu_device *ld,
                        /* set block grace time */
                        rc = qmt_set(env, qmt, LQUOTA_RES_DT, oqctl->qc_type,
                                     id, 0, 0, oqctl->qc_dqinfo.dqi_bgrace,
-                                    QIF_TIMES, false, false);
+                                    QIF_TIMES, false, false,
+                                    poolname);
                break;
 
        case LUSTRE_Q_GETDEFAULT:
@@ -353,24 +407,27 @@ static int qmt_quotactl(const struct lu_env *env, struct lu_device *ld,
                /* fallthrough */
 
        case Q_GETQUOTA: /* consult quota limit */
+       case LUSTRE_Q_GETQUOTAPOOL:
                /* extract quota ID from quotactl request */
                id->qid_uid = oqctl->qc_id;
 
                /* look-up inode quota settings */
                rc = qmt_get(env, qmt, LQUOTA_RES_MD, oqctl->qc_type, id,
                             &dqb->dqb_ihardlimit, &dqb->dqb_isoftlimit,
-                            &dqb->dqb_itime, is_default);
-               if (rc)
+                            &dqb->dqb_itime, is_default, poolname);
+               /* There could be no MD pool, so try to find DT pool */
+               if (rc && rc != -ENOENT)
                        break;
+               else
+                       dqb->dqb_valid |= QIF_ILIMITS | QIF_ITIME;
 
-               dqb->dqb_valid |= QIF_ILIMITS | QIF_ITIME;
                /* master isn't aware of actual inode usage */
                dqb->dqb_curinodes = 0;
 
                /* look-up block quota settings */
                rc = qmt_get(env, qmt, LQUOTA_RES_DT, oqctl->qc_type, id,
                             &dqb->dqb_bhardlimit, &dqb->dqb_bsoftlimit,
-                            &dqb->dqb_btime, is_default);
+                            &dqb->dqb_btime, is_default, poolname);
                if (rc)
                        break;
 
@@ -384,6 +441,7 @@ static int qmt_quotactl(const struct lu_env *env, struct lu_device *ld,
                /* fallthrough */
 
        case Q_SETQUOTA: /* change quota limits */
+       case LUSTRE_Q_SETQUOTAPOOL:
                /* extract quota ID from quotactl request */
                id->qid_uid = oqctl->qc_id;
 
@@ -393,7 +451,7 @@ static int qmt_quotactl(const struct lu_env *env, struct lu_device *ld,
                                     id, dqb->dqb_ihardlimit,
                                     dqb->dqb_isoftlimit, dqb->dqb_itime,
                                     dqb->dqb_valid & QIF_IFLAGS, is_default,
-                                    false);
+                                    false, poolname);
                        if (rc)
                                break;
                }
@@ -404,7 +462,7 @@ static int qmt_quotactl(const struct lu_env *env, struct lu_device *ld,
                                     id, dqb->dqb_bhardlimit,
                                     dqb->dqb_bsoftlimit, dqb->dqb_btime,
                                     dqb->dqb_valid & QIF_BFLAGS, is_default,
-                                    false);
+                                    false, poolname);
                break;
 
        default:
@@ -416,11 +474,133 @@ static int qmt_quotactl(const struct lu_env *env, struct lu_device *ld,
        RETURN(rc);
 }
 
+static inline
+void qmt_grant_lqes(const struct lu_env *env, __u64 *slv, __u64 cnt)
+{
+       int i;
+
+       for (i = 0; i < qti_lqes_cnt(env); i++)
+               qti_lqe_granted(env, i) += cnt;
+
+       *slv += cnt;
+}
+
+static inline bool qmt_lqes_can_rel(const struct lu_env *env, __u64 cnt)
+{
+       bool can_release = true;
+       int i;
+
+       for (i = 0; i < qti_lqes_cnt(env); i++) {
+               if (cnt > qti_lqe_granted(env, i)) {
+                       LQUOTA_ERROR(qti_lqes(env)[i],
+                                    "Can't release %llu that is larger than lqe_granted.\n",
+                                    cnt);
+                       can_release = false;
+               }
+       }
+       return can_release;
+}
+
+static inline void qmt_rel_lqes(const struct lu_env *env, __u64 *slv, __u64 cnt)
+{
+       int i;
+
+       for (i = 0; i < qti_lqes_cnt(env); i++)
+               qti_lqe_granted(env, i) -= cnt;
+
+       *slv -= cnt;
+}
+
+static inline bool qmt_lqes_cannot_grant(const struct lu_env *env, __u64 cnt)
+{
+       bool cannot_grant = false;
+       int i;
+
+       for (i = 0; i < qti_lqes_cnt(env); i++) {
+               if (qti_lqe_hard(env, i) != 0 &&
+                   qti_lqe_granted(env, i) + cnt > qti_lqe_hard(env, i)) {
+                       cannot_grant = true;
+                       break;
+               }
+       }
+       return cannot_grant;
+}
+
+static inline __u64 qmt_lqes_grant_some_quota(const struct lu_env *env)
+{
+       __u64 min_count, tmp;
+       bool flag = false;
+       int i;
+
+       for (i = 0, min_count = 0; i < qti_lqes_cnt(env); i++) {
+               if (!qti_lqes(env)[i]->lqe_enforced &&
+                   !qti_lqes(env)[i]->lqe_is_global)
+                       continue;
+
+               tmp = qti_lqe_hard(env, i) - qti_lqe_granted(env, i);
+               if (flag) {
+                       min_count = tmp < min_count ? tmp : min_count;
+               } else {
+                       flag = true;
+                       min_count = tmp;
+               }
+       }
+       return min_count;
+}
+
+static inline __u64 qmt_lqes_alloc_expand(const struct lu_env *env,
+                                         __u64 slv_granted, __u64 spare)
+{
+       __u64 min_count, tmp;
+       bool flag = false;
+       int i;
+
+       for (i = 0, min_count = 0; i < qti_lqes_cnt(env); i++) {
+               /* Don't take into account not enforced lqes that belong
+                * to non global pool. These lqes present in array to
+                * support actual lqe_granted even for lqes without limits. */
+               if (!qti_lqes(env)[i]->lqe_enforced &&
+                   !qti_lqes(env)[i]->lqe_is_global)
+                       continue;
+
+               tmp = qmt_alloc_expand(qti_lqes(env)[i], slv_granted, spare);
+               if (flag) {
+                       min_count = tmp < min_count ? tmp : min_count;
+               } else {
+                       flag = true;
+                       min_count = tmp;
+               }
+       }
+       return min_count;
+}
+
+static inline void qmt_lqes_tune_grace(const struct lu_env *env, __u64 now)
+{
+       int i;
+
+       for (i = 0; i < qti_lqes_cnt(env); i++) {
+               struct lquota_entry *lqe;
+
+               lqe = qti_lqes(env)[i];
+               if (lqe->lqe_softlimit != 0) {
+                       if (lqe->lqe_granted > lqe->lqe_softlimit &&
+                           lqe->lqe_gracetime == 0) {
+                               /* First time over soft limit, let's start grace
+                                * timer */
+                               lqe->lqe_gracetime = now + qmt_lqe_grace(lqe);
+                       } else if (lqe->lqe_granted <= lqe->lqe_softlimit &&
+                                  lqe->lqe_gracetime != 0) {
+                               /* Clear grace timer */
+                               lqe->lqe_gracetime = 0;
+                       }
+               }
+       }
+}
+
 /*
  * Helper function to handle quota request from slave.
  *
  * \param env     - is the environment passed by the caller
- * \param lqe     - is the lquota_entry subject to the quota request
  * \param qmt     - is the master device
  * \param uuid    - is the uuid associated with the slave
  * \param qb_flags - are the quota request flags as packed in the quota_body
@@ -434,16 +614,16 @@ static int qmt_quotactl(const struct lu_env *env, struct lu_device *ld,
  *         -EINPROGRESS : inform client to retry write/create
  *         -ve          : other appropriate errors
  */
-int qmt_dqacq0(const struct lu_env *env, struct lquota_entry *lqe,
-              struct qmt_device *qmt, struct obd_uuid *uuid, __u32 qb_flags,
-              __u64 qb_count, __u64 qb_usage, struct quota_body *repbody)
+int qmt_dqacq0(const struct lu_env *env, struct qmt_device *qmt,
+              struct obd_uuid *uuid, __u32 qb_flags, __u64 qb_count,
+              __u64 qb_usage, struct quota_body *repbody)
 {
-       struct qmt_thread_info  *qti = qmt_info(env);
        __u64                    now, count;
        struct dt_object        *slv_obj = NULL;
        __u64                    slv_granted, slv_granted_bck;
        struct thandle          *th = NULL;
        int                      rc, ret;
+       struct lquota_entry *lqe = qti_lqes_glbl(env);
        ENTRY;
 
        LASSERT(uuid != NULL);
@@ -455,6 +635,9 @@ int qmt_dqacq0(const struct lu_env *env, struct lquota_entry *lqe,
        if (OBD_FAIL_CHECK(OBD_FAIL_QUOTA_RECOVERABLE_ERR))
                RETURN(-cfs_fail_val);
 
+       if (qti_lqes_restore_init(env))
+               RETURN(-ENOMEM);
+
        /* look-up index file associated with acquiring slave */
        slv_obj = lquota_disk_slv_find(env, qmt->qmt_child, LQE_ROOT(lqe),
                                       lu_object_fid(&LQE_GLB_OBJ(lqe)->do_lu),
@@ -468,12 +651,13 @@ int qmt_dqacq0(const struct lu_env *env, struct lquota_entry *lqe,
 
        /* allocate & start transaction with enough credits to update
         * global & slave indexes */
-       th = qmt_trans_start_with_slv(env, lqe, slv_obj, &qti->qti_restore);
+       th = qmt_trans_start_with_slv(env, NULL, slv_obj, false);
        if (IS_ERR(th))
                GOTO(out, rc = PTR_ERR(th));
 
-       lqe_write_lock(lqe);
-       LQUOTA_DEBUG(lqe, "dqacq starts uuid:%s flags:0x%x wanted:%llu"
+       qti_lqes_write_lock(env);
+
+       LQUOTA_DEBUG_LQES(env, "dqacq starts uuid:%s flags:0x%x wanted:%llu"
                     " usage:%llu", obd_uuid2str(uuid), qb_flags, qb_count,
                     qb_usage);
 
@@ -484,14 +668,14 @@ int qmt_dqacq0(const struct lu_env *env, struct lquota_entry *lqe,
                GOTO(out_locked, rc = -ESRCH);
 
        /* recompute qunit in case it was never initialized */
-       qmt_revalidate(env, lqe);
+       qmt_revalidate_lqes(env, qmt, qb_flags);
 
        /* slave just wants to acquire per-ID lock */
        if (req_is_acq(qb_flags) && qb_count == 0)
                GOTO(out_locked, rc = 0);
 
        /* fetch how much quota space is already granted to this slave */
-       rc = qmt_slv_read(env, lqe, slv_obj, &slv_granted);
+       rc = qmt_slv_read(env, &lqe->lqe_id, slv_obj, &slv_granted);
        if (rc) {
                LQUOTA_ERROR(lqe, "Failed to get granted for slave %s, rc=%d",
                             obd_uuid2str(uuid), rc);
@@ -507,18 +691,18 @@ int qmt_dqacq0(const struct lu_env *env, struct lquota_entry *lqe,
        if (req_is_rel(qb_flags)) {
                /* Slave would like to release quota space */
                if (slv_granted < qb_count ||
-                   lqe->lqe_granted < qb_count) {
+                   !qmt_lqes_can_rel(env, qb_count)) {
                        /* can't release more than granted */
-                       LQUOTA_ERROR(lqe,
-                                    "Release too much! uuid:%s release:%llu granted:%llu, total:%llu",
-                                    obd_uuid2str(uuid), qb_count,
-                                    slv_granted, lqe->lqe_granted);
+                       LQUOTA_ERROR_LQES(env,
+                                         "Release too much! uuid:%s release: %llu granted:%llu, total:%llu",
+                                         obd_uuid2str(uuid), qb_count,
+                                         slv_granted, lqe->lqe_granted);
                        GOTO(out_locked, rc = -EINVAL);
                }
 
                repbody->qb_count = qb_count;
                /* put released space back to global pool */
-               QMT_REL(lqe, slv_granted, qb_count);
+               qmt_rel_lqes(env, &slv_granted, qb_count);
                GOTO(out_write, rc = 0);
        }
 
@@ -527,18 +711,18 @@ int qmt_dqacq0(const struct lu_env *env, struct lquota_entry *lqe,
                 * out to be using more quota space than owned, so we adjust
                 * granted space regardless of the current state of affairs */
                repbody->qb_count = qb_usage - slv_granted;
-               QMT_GRANT(lqe, slv_granted, repbody->qb_count);
+               qmt_grant_lqes(env, &slv_granted, repbody->qb_count);
        }
 
        if (!req_is_acq(qb_flags) && !req_is_preacq(qb_flags))
                GOTO(out_write, rc = 0);
 
-       qmt_adjust_edquot(lqe, now);
-       if (lqe->lqe_edquot)
+       qmt_adjust_edquot_notify(env, qmt, now, qb_flags);
+       if (qti_lqes_edquot(env))
                /* no hope to claim further space back */
                GOTO(out_write, rc = -EDQUOT);
 
-       if (qmt_space_exhausted(lqe, now)) {
+       if (qmt_space_exhausted_lqes(env, now)) {
                /* might have some free space once rebalancing is completed */
                rc = req_is_acq(qb_flags) ? -EINPROGRESS : -EDQUOT;
                GOTO(out_write, rc);
@@ -549,41 +733,39 @@ int qmt_dqacq0(const struct lu_env *env, struct lquota_entry *lqe,
                 * reports in qb_count how much spare quota space it owns and we
                 * can grant back quota space which is consistent with qunit
                 * value. */
-
-               if (qb_count >= lqe->lqe_qunit)
+               if (qb_count >= qti_lqes_min_qunit(env))
                        /* slave already own the maximum it should */
                        GOTO(out_write, rc = 0);
 
-               count = qmt_alloc_expand(lqe, slv_granted, qb_count);
+               count = qmt_lqes_alloc_expand(env, slv_granted, qb_count);
                if (count == 0)
                        GOTO(out_write, rc = -EDQUOT);
 
                repbody->qb_count += count;
-               QMT_GRANT(lqe, slv_granted, count);
+               qmt_grant_lqes(env, &slv_granted, count);
                GOTO(out_write, rc = 0);
        }
 
        /* processing acquire request with clients waiting */
-       if (lqe->lqe_hardlimit != 0 &&
-           lqe->lqe_granted + qb_count > lqe->lqe_hardlimit) {
+       if (qmt_lqes_cannot_grant(env, qb_count)) {
                /* cannot grant as much as asked, but can still afford to grant
                 * some quota space back */
-               count = lqe->lqe_hardlimit - lqe->lqe_granted;
+               count = qmt_lqes_grant_some_quota(env);
                repbody->qb_count += count;
-               QMT_GRANT(lqe, slv_granted, count);
+               qmt_grant_lqes(env, &slv_granted, count);
                GOTO(out_write, rc = 0);
        }
 
        /* Whouhou! we can satisfy the slave request! */
        repbody->qb_count += qb_count;
-       QMT_GRANT(lqe, slv_granted, qb_count);
+       qmt_grant_lqes(env, &slv_granted, qb_count);
 
        /* Try to expand the acquired count for DQACQ */
-       count = qmt_alloc_expand(lqe, slv_granted, 0);
+       count = qmt_lqes_alloc_expand(env, slv_granted, 0);
        if (count != 0) {
                /* can even grant more than asked, it is like xmas ... */
                repbody->qb_count += count;
-               QMT_GRANT(lqe, slv_granted, count);
+               qmt_grant_lqes(env, &slv_granted, count);
                GOTO(out_write, rc = 0);
        }
 
@@ -593,35 +775,25 @@ out_write:
                GOTO(out_locked, rc);
 
        /* start/stop grace timer if required */
-       if (lqe->lqe_softlimit != 0) {
-               if (lqe->lqe_granted > lqe->lqe_softlimit &&
-                   lqe->lqe_gracetime == 0)
-                       /* first time over soft limit, let's start grace
-                        * timer */
-                       lqe->lqe_gracetime = now + qmt_lqe_grace(lqe);
-               else if (lqe->lqe_granted <= lqe->lqe_softlimit &&
-                        lqe->lqe_gracetime != 0)
-                       /* Clear grace timer */
-                       lqe->lqe_gracetime = 0;
-       }
+       qmt_lqes_tune_grace(env, now);
 
        /* Update slave index first since it is easier to roll back */
        ret = qmt_slv_write(env, th, lqe, slv_obj, LQUOTA_BUMP_VER,
                            &repbody->qb_slv_ver, slv_granted);
        if (ret) {
                /* restore initial quota settings */
-               qmt_restore(lqe, &qti->qti_restore);
+               qmt_restore_lqes(env);
                /* reset qb_count */
                repbody->qb_count = 0;
                GOTO(out_locked, rc = ret);
        }
 
        /* Update global index, no version bump needed */
-       ret = qmt_glb_write(env, th, lqe, 0, NULL);
+       ret = qmt_glb_write_lqes(env, th, 0, NULL);
        if (ret) {
                rc = ret;
                /* restore initial quota settings */
-               qmt_restore(lqe, &qti->qti_restore);
+               qmt_restore_lqes(env);
                /* reset qb_count */
                repbody->qb_count = 0;
 
@@ -633,21 +805,22 @@ out_write:
                                     "value rc:%d ret%d", rc, ret);
                        LBUG();
                }
-               qmt_adjust_edquot(lqe, now);
+               qmt_adjust_edquot_notify(env, qmt, now, qb_flags);
                GOTO(out_locked, rc);
        }
 
        /* Total granted has been changed, let's try to adjust the qunit
         * size according to the total granted & limits. */
-       qmt_adjust_qunit(env, lqe);
 
        /* clear/set edquot flag and notify slaves via glimpse if needed */
-       qmt_adjust_edquot(lqe, now);
+       qmt_adjust_and_notify(env, qmt, now, qb_flags);
 out_locked:
-       LQUOTA_DEBUG(lqe, "dqacq ends count:%llu ver:%llu rc:%d",
+       LQUOTA_DEBUG_LQES(env, "dqacq ends count:%llu ver:%llu rc:%d",
                     repbody->qb_count, repbody->qb_slv_ver, rc);
-       lqe_write_unlock(lqe);
+       qti_lqes_write_unlock(env);
 out:
+       qti_lqes_restore_fini(env);
+
        if (th != NULL && !IS_ERR(th))
                dt_trans_stop(env, qmt->qmt_child, th);
 
@@ -666,6 +839,55 @@ out:
 }
 
 /*
+ * Extract index from uuid or quota index file name.
+ *
+ * \param[in] uuid     uuid or quota index name(0x1020000-OST0001_UUID)
+ * \param[out] idx     pointer to save index
+ *
+ * \retval             slave type(QMT_STYPE_MDT or QMT_STYPE_OST)
+ * \retval -EINVAL     wrong uuid
+ */
+int qmt_uuid2idx(struct obd_uuid *uuid, int *idx)
+{
+       char *uuid_str, *name, *dash;
+       int rc = -EINVAL;
+
+       uuid_str = (char *)uuid->uuid;
+
+       if (strnlen(uuid_str, UUID_MAX) >= UUID_MAX) {
+               CERROR("quota: UUID '%.*s' missing trailing NUL: rc = %d\n",
+                      UUID_MAX, uuid_str, rc);
+               return rc;
+       }
+
+       dash = strrchr(uuid_str, '-');
+       name = dash + 1;
+       /* Going to get index from MDTXXXX/OSTXXXX. Thus uuid should
+        * have at least 8 bytes after '-': 3 for MDT/OST, 4 for index
+        * and 1 byte for null character. */
+       if (*dash != '-' || ((uuid_str + UUID_MAX - name) < 8)) {
+               CERROR("quota: wrong UUID format '%s': rc = %d\n",
+                      uuid_str, rc);
+               return rc;
+       }
+
+       rc = target_name2index(name, idx, NULL);
+       switch (rc) {
+       case LDD_F_SV_TYPE_MDT:
+               rc = QMT_STYPE_MDT;
+               break;
+       case LDD_F_SV_TYPE_OST:
+               rc = QMT_STYPE_OST;
+               break;
+       default:
+               CERROR("quota: wrong UUID type '%s': rc = %d\n", uuid_str, rc);
+               rc = -EINVAL;
+       }
+
+       RETURN(rc);
+}
+
+/*
  * Handle quota request from slave.
  *
  * \param env  - is the environment passed by the caller
@@ -675,13 +897,12 @@ out:
 static int qmt_dqacq(const struct lu_env *env, struct lu_device *ld,
                     struct ptlrpc_request *req)
 {
-       struct qmt_device       *qmt = lu2qmt_dev(ld);
-       struct quota_body       *qbody, *repbody;
-       struct obd_uuid         *uuid;
-       struct ldlm_lock        *lock;
-       struct lquota_entry     *lqe;
-       int                      pool_type, qtype;
-       int                      rc;
+       struct qmt_device *qmt = lu2qmt_dev(ld);
+       struct quota_body *qbody, *repbody;
+       struct obd_uuid *uuid;
+       struct ldlm_lock *lock;
+       int rtype, qtype;
+       int rc, idx, stype;
        ENTRY;
 
        qbody = req_capsule_client_get(&req->rq_pill, &RMF_QUOTA_BODY);
@@ -702,6 +923,9 @@ static int qmt_dqacq(const struct lu_env *env, struct lu_device *ld,
        LDLM_LOCK_PUT(lock);
 
        uuid = &req->rq_export->exp_client_uuid;
+       stype = qmt_uuid2idx(uuid, &idx);
+       if (stype < 0)
+               RETURN(stype);
 
        if (req_is_rel(qbody->qb_flags) + req_is_acq(qbody->qb_flags) +
            req_is_preacq(qbody->qb_flags) > 1) {
@@ -747,26 +971,27 @@ static int qmt_dqacq(const struct lu_env *env, struct lu_device *ld,
 
        /* extract quota information from global index FID packed in the
         * request */
-       rc = lquota_extract_fid(&qbody->qb_fid, &pool_type, &qtype);
+       rc = lquota_extract_fid(&qbody->qb_fid, &rtype, &qtype);
        if (rc)
                RETURN(-EINVAL);
 
        /* Find the quota entry associated with the quota id */
-       lqe = qmt_pool_lqe_lookup(env, qmt, pool_type, qtype,
-                                 &qbody->qb_id);
-       if (IS_ERR(lqe))
-               RETURN(PTR_ERR(lqe));
+       rc = qmt_pool_lqes_lookup(env, qmt, rtype, stype, qtype,
+                                 &qbody->qb_id, NULL, idx);
+       if (rc)
+               RETURN(rc);
 
-       /* process quota request */
-       rc = qmt_dqacq0(env, lqe, qmt, uuid, qbody->qb_flags, qbody->qb_count,
-                       qbody->qb_usage, repbody);
+       rc = qmt_dqacq0(env, qmt, uuid, qbody->qb_flags,
+                       qbody->qb_count, qbody->qb_usage, repbody);
 
        if (lustre_handle_is_used(&qbody->qb_lockh))
                /* return current qunit value only to slaves owning an per-ID
                 * quota lock. For enqueue, the qunit value will be returned in
                 * the LVB */
-                repbody->qb_qunit = lqe->lqe_qunit;
-       lqe_putref(lqe);
+               repbody->qb_qunit = qti_lqes_min_qunit(env);
+       CDEBUG(D_QUOTA, "qmt_dqacq return qb_qunit %llu qb_count %llu\n",
+              repbody->qb_qunit, repbody->qb_count);
+       qti_lqes_fini(env);
        RETURN(rc);
 }
 
index cd68fa8..af51f9d 100644 (file)
@@ -46,6 +46,8 @@ struct qmt_device {
 
        /* service name of this qmt */
        char                    qmt_svname[MAX_OBD_NAME];
+       /* root directory for this qmt */
+       struct dt_object        *qmt_root;
 
        /* Reference to the next device in the side stack
         * The child device is actually the OSD device where we store the quota
@@ -69,8 +71,8 @@ struct qmt_device {
 
        /* List of pools managed by this master target */
        struct list_head         qmt_pool_list;
-       /* rw spinlock to protect pool list */
-       rwlock_t                 qmt_pool_lock;
+       /* rw semaphore to protect pool list */
+       struct rw_semaphore      qmt_pool_lock;
 
        /* procfs root directory for this qmt */
        struct proc_dir_entry   *qmt_proc;
@@ -88,7 +90,28 @@ struct qmt_device {
 
 };
 
+struct qmt_pool_info;
 #define QPI_MAXNAME    (LOV_MAXPOOLNAME + 1)
+#define qmt_pool_global(qpi) \
+       (!strncmp(qpi->qpi_name, GLB_POOL_NAME, \
+                 strlen(GLB_POOL_NAME) + 1) ? true : false)
+/* Draft for mdt pools */
+union qmt_sarray {
+       struct lu_tgt_pool      osts;
+};
+
+/* Since DOM support, data resources can exist
+ * on both MDT and OST targets. */
+enum {
+       QMT_STYPE_MDT,
+       QMT_STYPE_OST,
+       QMT_STYPE_CNT
+};
+
+enum {
+       /* set while recalc_thread is working */
+       QPI_FLAG_RECALC_OFFSET,
+};
 
 /*
  * Per-pool quota information.
@@ -104,6 +127,14 @@ struct qmt_pool_info {
        int                      qpi_rtype;
        char                     qpi_name[QPI_MAXNAME];
 
+       union qmt_sarray         qpi_sarr;
+       /* recalculation thread pointer */
+       struct ptlrpc_thread     qpi_recalc_thread;
+       /* rw semaphore to avoid acquire/release during
+        * pool recalculation. */
+       struct rw_semaphore      qpi_recalc_sem;
+       unsigned long            qpi_flags;
+
        /* track users of this pool instance */
        atomic_t                 qpi_ref;
 
@@ -121,7 +152,7 @@ struct qmt_pool_info {
        struct lquota_site      *qpi_site[LL_MAXQUOTAS];
 
        /* number of slaves registered for each quota types */
-       int                      qpi_slv_nr[LL_MAXQUOTAS];
+       int                      qpi_slv_nr[QMT_STYPE_CNT][LL_MAXQUOTAS];
 
        /* reference on lqe (ID 0) storing grace time. */
        struct lquota_entry     *qpi_grace_lqe[LL_MAXQUOTAS];
@@ -154,6 +185,25 @@ struct qmt_pool_info {
        unsigned long            qpi_soft_least_qunit;
 };
 
+static inline int qpi_slv_nr(struct qmt_pool_info *pool, int qtype)
+{
+       int i, sum = 0;
+
+       for (i = 0; i < QMT_STYPE_CNT; i++)
+               sum += pool->qpi_slv_nr[i][qtype];
+
+       return sum;
+}
+
+static inline int qpi_slv_nr_by_rtype(struct qmt_pool_info *pool, int qtype)
+{
+       if (pool->qpi_rtype == LQUOTA_RES_DT)
+               /* Here should be qpi_slv_nr() if MDTs will be added
+                * to quota pools */
+               return pool->qpi_slv_nr[QMT_STYPE_OST][qtype];
+       else
+               return pool->qpi_slv_nr[QMT_STYPE_MDT][qtype];
+}
 /*
  * Helper routines and prototypes
  */
@@ -184,16 +234,44 @@ struct qmt_lqe_restore {
        __u64   qlr_qunit;
 };
 
+#define QMT_MAX_POOL_NUM       16
 /* Common data shared by qmt handlers */
 struct qmt_thread_info {
-       union lquota_rec        qti_rec;
-       union lquota_id         qti_id;
-       char                    qti_buf[MTI_NAME_MAXLEN];
-       struct lu_fid           qti_fid;
-       struct ldlm_res_id      qti_resid;
-       union ldlm_gl_desc      qti_gl_desc;
-       struct quota_body       qti_body;
-       struct qmt_lqe_restore  qti_restore;
+       union lquota_rec         qti_rec;
+       union lquota_id          qti_id;
+       char                     qti_buf[MTI_NAME_MAXLEN];
+       struct lu_fid            qti_fid;
+       struct ldlm_res_id       qti_resid;
+       union ldlm_gl_desc       qti_gl_desc;
+       struct quota_body        qti_body;
+       union {
+               struct qmt_lqe_restore  qti_lqes_rstr_small[QMT_MAX_POOL_NUM];
+               struct qmt_lqe_restore  *qti_lqes_rstr;
+       };
+       union {
+               struct qmt_pool_info    *qti_pools_small[QMT_MAX_POOL_NUM];
+               /* Pointer to an array of qpis in case when
+                * qti_pools_cnt > QMT_MAX_POOL_NUM. */
+               struct qmt_pool_info    **qti_pools;
+       };
+       /* The number of pools in qti_pools */
+       int                      qti_pools_cnt;
+       /* Maximum number of elements in qti_pools array.
+        * By default it is QMT_MAX_POOL_NUM. */
+       int                      qti_pools_num;
+       int                      qti_glbl_lqe_idx;
+       /* The same is for lqe ... */
+       union {
+               struct lquota_entry     *qti_lqes_small[QMT_MAX_POOL_NUM];
+               /* Pointer to an array of lqes in case when
+                * qti_lqes_cnt > QMT_MAX_POOL_NUM. */
+               struct lquota_entry     **qti_lqes;
+       };
+       /* The number of lqes in qti_lqes */
+       int                      qti_lqes_cnt;
+       /* Maximum number of elements in qti_lqes array.
+        * By default it is QMT_MAX_POOL_NUM. */
+       int                      qti_lqes_num;
 };
 
 extern struct lu_context_key qmt_thread_key;
@@ -205,6 +283,21 @@ struct qmt_thread_info *qmt_info(const struct lu_env *env)
        return lu_env_info(env, &qmt_thread_key);
 }
 
+#define qti_lqes_num(env)      (qmt_info(env)->qti_lqes_num)
+#define qti_lqes_cnt(env)      (qmt_info(env)->qti_lqes_cnt)
+#define qti_glbl_lqe_idx(env)  (qmt_info(env)->qti_glbl_lqe_idx)
+#define qti_lqes(env)          (qti_lqes_num(env) > QMT_MAX_POOL_NUM ? \
+                                       qmt_info(env)->qti_lqes : \
+                                       qmt_info(env)->qti_lqes_small)
+#define qti_lqes_rstr(env)     (qti_lqes_num(env) > QMT_MAX_POOL_NUM ? \
+                                       qmt_info(env)->qti_lqes_rstr : \
+                                       qmt_info(env)->qti_lqes_rstr_small)
+#define qti_lqes_glbl(env)     (qti_lqes(env)[qti_glbl_lqe_idx(env)])
+#define qti_lqe_hard(env, i)   (qti_lqes(env)[i]->lqe_hardlimit)
+#define qti_lqe_soft(env, i)   (qti_lqes(env)[i]->lqe_softlimit)
+#define qti_lqe_granted(env, i)        (qti_lqes(env)[i]->lqe_granted)
+#define qti_lqe_qunit(env, i)  (qti_lqes(env)[i]->lqe_qunit)
+
 /* helper routine to convert a lu_device into a qmt_device */
 static inline struct qmt_device *lu2qmt_dev(struct lu_device *ld)
 {
@@ -218,7 +311,7 @@ static inline struct lu_device *qmt2lu_dev(struct qmt_device *qmt)
 }
 
 #define LQE_ROOT(lqe)    (lqe2qpi(lqe)->qpi_root)
-#define LQE_GLB_OBJ(lqe) (lqe2qpi(lqe)->qpi_glb_obj[lqe->lqe_site->lqs_qtype])
+#define LQE_GLB_OBJ(lqe) (lqe2qpi(lqe)->qpi_glb_obj[lqe_qtype(lqe)])
 
 /* helper function returning grace time to use for a given lquota entry */
 static inline __u64 qmt_lqe_grace(struct lquota_entry *lqe)
@@ -226,7 +319,7 @@ static inline __u64 qmt_lqe_grace(struct lquota_entry *lqe)
        struct qmt_pool_info    *pool = lqe2qpi(lqe);
        struct lquota_entry     *grace_lqe;
 
-       grace_lqe = pool->qpi_grace_lqe[lqe->lqe_site->lqs_qtype];
+       grace_lqe = pool->qpi_grace_lqe[lqe_qtype(lqe)];
        LASSERT(grace_lqe != NULL);
 
        return grace_lqe->lqe_gracetime;
@@ -242,6 +335,14 @@ static inline void qmt_restore(struct lquota_entry *lqe,
        lqe->lqe_qunit     = restore->qlr_qunit;
 }
 
+static inline void qmt_restore_lqes(const struct lu_env *env)
+{
+       int i;
+
+       for (i = 0; i < qti_lqes_cnt(env); i++)
+               qmt_restore(qti_lqes(env)[i], &qti_lqes_rstr(env)[i]);
+}
+
 #define QMT_GRANT(lqe, slv, cnt)             \
        do {                                 \
                (lqe)->lqe_granted += (cnt); \
@@ -279,6 +380,17 @@ static inline bool qmt_space_exhausted(struct lquota_entry *lqe, __u64 now)
        return (qmt_hard_exhausted(lqe) || qmt_soft_exhausted(lqe, now));
 }
 
+static inline bool qmt_space_exhausted_lqes(const struct lu_env *env, __u64 now)
+{
+       bool exhausted = false;
+       int i;
+
+       for (i = 0; i < qti_lqes_cnt(env) && !exhausted; i++)
+               exhausted |= qmt_space_exhausted(qti_lqes(env)[i], now);
+
+       return exhausted;
+}
+
 /* helper routine clearing the default quota setting  */
 static inline void qmt_lqe_clear_default(struct lquota_entry *lqe)
 {
@@ -292,16 +404,63 @@ static inline void qmt_lqe_clear_default(struct lquota_entry *lqe)
 #define QMT_REBA_TIMEOUT 2
 
 /* qmt_pool.c */
+
+void qmt_pool_free(const struct lu_env *, struct qmt_pool_info *);
+/*
+ * Reference counter management for qmt_pool_info structures
+ */
+static inline void qpi_getref(struct qmt_pool_info *pool)
+{
+       atomic_inc(&pool->qpi_ref);
+}
+
+static inline void qpi_putref(const struct lu_env *env,
+                             struct qmt_pool_info *pool)
+{
+       LASSERT(atomic_read(&pool->qpi_ref) > 0);
+       if (atomic_dec_and_test(&pool->qpi_ref))
+               qmt_pool_free(env, pool);
+}
+
+
 void qmt_pool_fini(const struct lu_env *, struct qmt_device *);
 int qmt_pool_init(const struct lu_env *, struct qmt_device *);
 int qmt_pool_prepare(const struct lu_env *, struct qmt_device *,
-                  struct dt_object *);
+                  struct dt_object *, char *);
 int qmt_pool_new_conn(const struct lu_env *, struct qmt_device *,
                      struct lu_fid *, struct lu_fid *, __u64 *,
                      struct obd_uuid *);
+
+#define GLB_POOL_NAME  "0x0"
+#define qmt_pool_lookup_glb(env, qmt, type) \
+               qmt_pool_lookup(env, qmt, type, NULL, -1, false)
+#define qmt_pool_lookup_name(env, qmt, type, name) \
+               qmt_pool_lookup(env, qmt, type, name, -1, false)
+#define qmt_pool_lookup_arr(env, qmt, type, idx) \
+               qmt_pool_lookup(env, qmt, type, NULL, idx, true)
+struct qmt_pool_info *qmt_pool_lookup(const struct lu_env *env,
+                                            struct qmt_device *qmt,
+                                            int rtype,
+                                            char *pool_name,
+                                            int idx,
+                                            bool add);
 struct lquota_entry *qmt_pool_lqe_lookup(const struct lu_env *,
                                         struct qmt_device *, int, int,
-                                        union lquota_id *);
+                                        union lquota_id *, char *);
+int qmt_pool_lqes_lookup(const struct lu_env *, struct qmt_device *, int,
+                        int, int, union lquota_id *, char *, int);
+int qmt_pool_lqes_lookup_spec(const struct lu_env *env, struct qmt_device *qmt,
+                             int rtype, int qtype, union lquota_id *qid);
+void qmt_lqes_sort(const struct lu_env *env);
+int qmt_pool_new(struct obd_device *obd, char *poolname);
+int qmt_pool_add(struct obd_device *obd, char *poolname, char *ostname);
+int qmt_pool_rem(struct obd_device *obd, char *poolname, char *ostname);
+int qmt_pool_del(struct obd_device *obd, char *poolname);
+
+inline struct rw_semaphore *qmt_sarr_rwsem(struct qmt_pool_info *qpi);
+inline int qmt_sarr_get_idx(struct qmt_pool_info *qpi, int arr_idx);
+inline unsigned int qmt_sarr_count(struct qmt_pool_info *qpi);
+
 /* qmt_entry.c */
 extern struct lquota_entry_operations qmt_lqe_ops;
 int qmt_lqe_set_default(const struct lu_env *env, struct qmt_pool_info *pool,
@@ -309,29 +468,63 @@ int qmt_lqe_set_default(const struct lu_env *env, struct qmt_pool_info *pool,
 struct thandle *qmt_trans_start_with_slv(const struct lu_env *,
                                         struct lquota_entry *,
                                         struct dt_object *,
-                                        struct qmt_lqe_restore *);
-struct thandle *qmt_trans_start(const struct lu_env *, struct lquota_entry *,
-                               struct qmt_lqe_restore *);
+                                        bool);
+struct thandle *qmt_trans_start(const struct lu_env *, struct lquota_entry *);
+int qmt_glb_write_lqes(const struct lu_env *, struct thandle *, __u32, __u64 *);
 int qmt_glb_write(const struct lu_env *, struct thandle *,
                  struct lquota_entry *, __u32, __u64 *);
 int qmt_slv_write(const struct lu_env *, struct thandle *,
                  struct lquota_entry *, struct dt_object *, __u32, __u64 *,
                  __u64);
-int qmt_slv_read(const struct lu_env *, struct lquota_entry *,
+int qmt_slv_read(const struct lu_env *,  union lquota_id *,
                 struct dt_object *, __u64 *);
 int qmt_validate_limits(struct lquota_entry *, __u64, __u64);
-void qmt_adjust_qunit(const struct lu_env *, struct lquota_entry *);
-void qmt_adjust_edquot(struct lquota_entry *, __u64);
-void qmt_revalidate(const struct lu_env *, struct lquota_entry *);
+bool qmt_adjust_qunit(const struct lu_env *, struct lquota_entry *);
+bool qmt_adjust_edquot(struct lquota_entry *, __u64);
+
+#define qmt_adjust_edquot_notify(env, qmt, now, qb_flags) \
+         qmt_adjust_edquot_qunit_notify(env, qmt, now, true, false, qb_flags)
+#define qmt_adjust_qunit_notify(env, qmt, qb_flags) \
+         qmt_adjust_edquot_qunit_notify(env, qmt, 0, false, true, qb_flags)
+#define qmt_adjust_and_notify(env, qmt, now, qb_flags) \
+         qmt_adjust_edquot_qunit_notify(env, qmt, now, true, true, qb_flags)
+bool qmt_adjust_edquot_qunit_notify(const struct lu_env *, struct qmt_device *,
+                                   __u64, bool, bool, __u32);
+bool qmt_revalidate(const struct lu_env *, struct lquota_entry *);
+void qmt_revalidate_lqes(const struct lu_env *, struct qmt_device *, __u32);
 __u64 qmt_alloc_expand(struct lquota_entry *, __u64, __u64);
 
+inline void qti_lqes_init(const struct lu_env *);
+inline int qti_lqes_add(const struct lu_env *, struct lquota_entry *);
+inline void qti_lqes_del(const struct lu_env *, int);
+inline void qti_lqes_fini(const struct lu_env *);
+inline int qti_lqes_min_qunit(const struct lu_env *);
+inline int qti_lqes_edquot(const struct lu_env *);
+inline int qti_lqes_restore_init(const struct lu_env *env);
+inline void qti_lqes_restore_fini(const struct lu_env *env);
+inline void qti_lqes_write_lock(const struct lu_env *env);
+inline void qti_lqes_write_unlock(const struct lu_env *env);
+
+struct lqe_glbl_data *qmt_alloc_lqe_gd(struct qmt_pool_info *, int);
+void qmt_free_lqe_gd(struct lqe_glbl_data *);
+void qmt_setup_lqe_gd(const struct lu_env *,  struct qmt_device *,
+                   struct lquota_entry *, struct lqe_glbl_data *, int);
+#define qmt_seed_glbe_edquot(env, lqeg) \
+               qmt_seed_glbe_all(env, lqeg, false, true)
+#define qmt_seed_glbe_qunit(env, lqeg) \
+               qmt_seed_glbe_all(env, lqeg, true, false)
+#define qmt_seed_glbe(env, lqeg) \
+               qmt_seed_glbe_all(env, lqeg, true, true)
+void qmt_seed_glbe_all(const struct lu_env *, struct lqe_glbl_data *,
+                      bool , bool);
+
 /* qmt_handler.c */
 int qmt_set_with_lqe(const struct lu_env *env, struct qmt_device *qmt,
                     struct lquota_entry *lqe, __u64 hard, __u64 soft,
                     __u64 time, __u32 valid, bool is_default, bool is_updated);
-int qmt_dqacq0(const struct lu_env *, struct lquota_entry *,
-              struct qmt_device *, struct obd_uuid *, __u32, __u64, __u64,
-              struct quota_body *);
+int qmt_dqacq0(const struct lu_env *, struct qmt_device *, struct obd_uuid *,
+              __u32, __u64, __u64, struct quota_body *);
+int qmt_uuid2idx(struct obd_uuid *, int *);
 
 /* qmt_lock.c */
 int qmt_intent_policy(const struct lu_env *, struct lu_device *,
index 5b744ec..69f5eb8 100644 (file)
@@ -89,6 +89,7 @@ int qmt_intent_policy(const struct lu_env *env, struct lu_device *ld,
        case IT_QUOTA_DQACQ: {
                struct lquota_entry     *lqe;
                struct ldlm_lock        *lock;
+               int idx;
 
                if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] == 0)
                        /* acquire on global lock? something is wrong ... */
@@ -103,15 +104,31 @@ int qmt_intent_policy(const struct lu_env *env, struct lu_device *ld,
                        GOTO(out, rc = -ENOLCK);
                LDLM_LOCK_PUT(lock);
 
+               rc = qmt_uuid2idx(uuid, &idx);
+               if (rc < 0)
+                       GOTO(out, rc = -EINVAL);
+
+               /* TODO: it seems we don't need to get lqe from
+                * lq_lvb_data anymore ... And do extra get
+                * and put on it */
                lqe = res->lr_lvb_data;
                LASSERT(lqe != NULL);
                lqe_getref(lqe);
 
+               rc = qmt_pool_lqes_lookup(env, qmt, lqe_rtype(lqe), rc,
+                                         lqe_qtype(lqe), &reqbody->qb_id,
+                                         NULL, idx);
+               if (rc) {
+                       lqe_putref(lqe);
+                       GOTO(out, rc);
+               }
+
                /* acquire quota space */
-               rc = qmt_dqacq0(env, lqe, qmt, uuid, reqbody->qb_flags,
-                               reqbody->qb_count, reqbody->qb_usage,
-                               repbody);
+               rc = qmt_dqacq0(env, qmt, uuid,
+                               reqbody->qb_flags, reqbody->qb_count,
+                               reqbody->qb_usage, repbody);
                lqe_putref(lqe);
+               qti_lqes_fini(env);
                if (rc)
                        GOTO(out, rc);
                break;
@@ -191,16 +208,36 @@ int qmt_lvbo_init(struct lu_device *ld, struct ldlm_resource *res)
        if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] != 0) {
                /* no ID quota lock associated with UID/GID 0 or with a seq 0,
                 * we are thus dealing with an ID lock. */
+               struct qmt_pool_info    *pool;
                struct lquota_entry     *lqe;
+               struct lqe_glbl_data    *lgd;
+
+               pool = qmt_pool_lookup_glb(env, qmt, pool_type);
+               if (IS_ERR(pool))
+                       GOTO(out, rc = -ENOMEM);
 
                /* Find the quota entry associated with the quota id */
                lqe = qmt_pool_lqe_lookup(env, qmt, pool_type, qtype,
-                                         &qti->qti_id);
-               if (IS_ERR(lqe))
+                                         &qti->qti_id, NULL);
+               if (IS_ERR(lqe)) {
+                       qpi_putref(env, pool);
                        GOTO(out, rc = PTR_ERR(lqe));
+               }
+
+               /* TODO: need something like qmt_extend_lqe_gd that has
+                * to be calledeach time when qpi_slv_nr is incremented */
+               lgd = qmt_alloc_lqe_gd(pool, qtype);
+               if (!lgd) {
+                       lqe_putref(lqe);
+                       qpi_putref(env, pool);
+                       GOTO(out, rc = -ENOMEM);
+               }
+
+               qmt_setup_lqe_gd(env, qmt, lqe, lgd, pool_type);
 
                /* store reference to lqe in lr_lvb_data */
                res->lr_lvb_data = lqe;
+               qpi_putref(env, pool);
                LQUOTA_DEBUG(lqe, "initialized res lvb");
        } else {
                struct dt_object        *obj;
@@ -225,6 +262,57 @@ out:
        return rc;
 }
 
+/* clear lge_qunit/edquot_nu flags -
+ * slave recieved new qunit and edquot.
+ *
+ * \retval     true if revoke is needed - qunit
+ *             for this slave reaches least_qunit
+ */
+static bool qmt_clear_lgeg_arr_nu(struct lquota_entry *lqe, int stype, int idx)
+{
+       unsigned long least_qunit = lqe2qpi(lqe)->qpi_least_qunit;
+       struct lqe_glbl_data *lgd = lqe->lqe_glbl_data;
+
+       /* There is no array to store lge for the case of DOM.
+        * Ignore it until MDT pools will be ready. */
+       if (!(lqe_rtype(lqe) == LQUOTA_RES_DT && stype == QMT_STYPE_MDT)) {
+               lqe->lqe_glbl_data->lqeg_arr[idx].lge_qunit_nu = 0;
+               lqe->lqe_glbl_data->lqeg_arr[idx].lge_edquot_nu = 0;
+
+               /* We shouldn't call revoke for DOM case, it will be updated
+                * at qmt_id_lock_glimpse. */
+               return (lgd->lqeg_arr[idx].lge_qunit == least_qunit);
+       }
+
+       return false;
+}
+
+static void qmt_set_revoke(struct lu_env *env, struct lquota_entry *lqe,
+                         int stype, int idx)
+{
+       unsigned long least_qunit = lqe2qpi(lqe)->qpi_least_qunit;
+       struct lqe_glbl_data *lgd = lqe->lqe_glbl_data;
+
+       if (lgd->lqeg_arr[idx].lge_qunit == least_qunit) {
+               int i;
+
+               qti_lqes_write_lock(env);
+               for (i = 0; i < qti_lqes_cnt(env); i++) {
+                       LQUOTA_DEBUG(qti_lqes(env)[i],
+                                    "idx %d lge_qunit %llu least_qunit %lu\n",
+                                    idx, lgd->lqeg_arr[idx].lge_qunit,
+                                    least_qunit);
+                       if (qti_lqes(env)[i]->lqe_qunit == least_qunit) {
+                               qti_lqes(env)[i]->lqe_revoke_time =
+                                                       ktime_get_seconds();
+                               qmt_adjust_edquot(qti_lqes(env)[i],
+                                                 ktime_get_real_seconds());
+                       }
+               }
+               qti_lqes_write_unlock(env);
+       }
+}
+
 /*
  * Update LVB associated with the global quota index.
  * This function is called from the DLM itself after a glimpse callback, in this
@@ -240,7 +328,8 @@ int qmt_lvbo_update(struct lu_device *ld, struct ldlm_resource *res,
        struct lquota_lvb       *lvb;
        struct ldlm_lock        *lock;
        struct obd_export       *exp;
-       int                      rc = 0;
+       bool                     need_revoke;
+       int                      rc = 0, idx;
        ENTRY;
 
        LASSERT(res != NULL);
@@ -264,17 +353,6 @@ int qmt_lvbo_update(struct lu_device *ld, struct ldlm_resource *res,
        LASSERT(lqe != NULL);
        lqe_getref(lqe);
 
-       LQUOTA_DEBUG(lqe, "releasing:%llu may release:%llu",
-                    lvb->lvb_id_rel, lvb->lvb_id_may_rel);
-
-       if (lvb->lvb_id_rel == 0) {
-               /* nothing to release */
-               if (lvb->lvb_id_may_rel != 0)
-                       /* but might still release later ... */
-                       lqe->lqe_may_rel += lvb->lvb_id_may_rel;
-               GOTO(out, rc = 0);
-       }
-
        /* allocate environement */
        env = lu_env_find();
        LASSERT(env);
@@ -301,17 +379,49 @@ int qmt_lvbo_update(struct lu_device *ld, struct ldlm_resource *res,
                GOTO(out, rc = -EFAULT);
        }
 
-       /* release quota space */
-       rc = qmt_dqacq0(env, lqe, qmt, &exp->exp_client_uuid,
-                       QUOTA_DQACQ_FL_REL, lvb->lvb_id_rel, 0, &qti->qti_body);
-       if (rc || qti->qti_body.qb_count != lvb->lvb_id_rel)
-               LQUOTA_ERROR(lqe, "failed to release quota space on glimpse "
-                            "%llu!=%llu : rc = %d\n", qti->qti_body.qb_count,
-                            lvb->lvb_id_rel, rc);
-       class_export_put(exp);
+       rc = qmt_uuid2idx(&exp->exp_client_uuid, &idx);
+       if (rc < 0)
+               GOTO(out_exp, rc);
+
+       need_revoke = qmt_clear_lgeg_arr_nu(lqe, rc, idx);
+       if (lvb->lvb_id_rel == 0) {
+               /* nothing to release */
+               if (lvb->lvb_id_may_rel != 0)
+                       /* but might still release later ... */
+                       lqe->lqe_may_rel += lvb->lvb_id_may_rel;
+       }
+
+       if (!need_revoke && lvb->lvb_id_rel == 0)
+               GOTO(out_exp, rc = 0);
+
+       rc = qmt_pool_lqes_lookup(env, qmt, lqe_rtype(lqe), rc, lqe_qtype(lqe),
+                                 &lqe->lqe_id, NULL, idx);
        if (rc)
-               GOTO(out, rc);
+               GOTO(out_exp, rc);
+
+       if (need_revoke)
+               qmt_set_revoke(env, lqe, rc, idx);
+
+       if (lvb->lvb_id_rel) {
+               LQUOTA_DEBUG(lqe, "releasing:%llu may release:%llu",
+                            lvb->lvb_id_rel, lvb->lvb_id_may_rel);
+
+               /* release quota space */
+               rc = qmt_dqacq0(env, qmt, &exp->exp_client_uuid,
+                               QUOTA_DQACQ_FL_REL, lvb->lvb_id_rel,
+                               0, &qti->qti_body);
+               if (rc || qti->qti_body.qb_count != lvb->lvb_id_rel)
+                       LQUOTA_ERROR(lqe,
+                                    "failed to release quota space on glimpse %llu!=%llu : rc = %d\n",
+                                    qti->qti_body.qb_count,
+                                    lvb->lvb_id_rel, rc);
+       }
+       qti_lqes_fini(env);
+       if (rc)
+               GOTO(out_exp, rc);
        EXIT;
+out_exp:
+       class_export_put(exp);
 out:
        lqe_putref(lqe);
        return rc;
@@ -333,37 +443,60 @@ int qmt_lvbo_size(struct lu_device *ld, struct ldlm_lock *lock)
 int qmt_lvbo_fill(struct lu_device *ld, struct ldlm_lock *lock, void *lvb,
                  int lvblen)
 {
-       struct ldlm_resource    *res = lock->l_resource;
-       struct lquota_lvb       *qlvb = lvb;
+       struct ldlm_resource *res = lock->l_resource;
+       struct lquota_lvb *qlvb = lvb;
+       struct lu_env *env;
+       int rc;
        ENTRY;
 
        LASSERT(res != NULL);
+       rc = 0;
 
        if (res->lr_type != LDLM_PLAIN || res->lr_lvb_data == NULL ||
            res->lr_name.name[LUSTRE_RES_ID_SEQ_OFF] != FID_SEQ_QUOTA_GLB)
                RETURN(-EINVAL);
 
+       env = lu_env_find();
+       LASSERT(env);
+
        if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] != 0) {
                /* no ID quota lock associated with UID/GID 0 or with a seq 0,
                 * we are thus dealing with an ID lock. */
-               struct lquota_entry     *lqe = res->lr_lvb_data;
-
+               struct lquota_entry *lqe = res->lr_lvb_data;
+               struct qmt_device *qmt;
+               struct obd_uuid *uuid;
+               int idx;
+
+               uuid = &(lock)->l_export->exp_client_uuid;
+               rc = qmt_uuid2idx(uuid, &idx);
+               if (rc < 0)
+                       RETURN(rc);
+               qmt = lu2qmt_dev(ld);
                /* return current qunit value & edquot flags in lvb */
                lqe_getref(lqe);
-               qlvb->lvb_id_qunit = lqe->lqe_qunit;
-               qlvb->lvb_flags = 0;
-               if (lqe->lqe_edquot)
-                       qlvb->lvb_flags = LQUOTA_FL_EDQUOT;
+               rc = qmt_pool_lqes_lookup(env, qmt, lqe_rtype(lqe), rc,
+                                         lqe_qtype(lqe), &lqe->lqe_id,
+                                         NULL, idx);
+               if (!rc) {
+                       qlvb->lvb_id_qunit = qti_lqes_min_qunit(env);
+                       qlvb->lvb_flags = 0;
+                       if (qti_lqes_edquot(env))
+                               qlvb->lvb_flags = LQUOTA_FL_EDQUOT;
+                       qti_lqes_fini(env);
+               }
+               CDEBUG(D_QUOTA, "uuid %s lqe_id %lu, edquot %llu qunit %llu\n",
+                      (char *)uuid, (unsigned long)lqe->lqe_id.qid_uid,
+                      qlvb->lvb_flags, qlvb->lvb_id_qunit);
                lqe_putref(lqe);
        } else {
                /* global quota lock */
-               struct dt_object *obj = res->lr_lvb_data;
+               struct dt_object        *obj = res->lr_lvb_data;
 
                /* return current version of global index */
-               qlvb->lvb_glb_ver = dt_version_get(lu_env_find(), obj);
+               qlvb->lvb_glb_ver = dt_version_get(env, obj);
        }
 
-       RETURN(sizeof(struct lquota_lvb));
+       RETURN(rc = rc ?: sizeof(struct lquota_lvb));
 }
 
 /*
@@ -379,10 +512,13 @@ int qmt_lvbo_free(struct lu_device *ld, struct ldlm_resource *res)
                RETURN(0);
 
        if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] != 0) {
-               struct lquota_entry     *lqe = res->lr_lvb_data;
+               struct lquota_entry *lqe = res->lr_lvb_data;
+               struct lqe_glbl_data *lgd = lqe->lqe_glbl_data;
 
                /* release lqe reference */
+               lqe->lqe_glbl_data = NULL;
                lqe_putref(lqe);
+               qmt_free_lqe_gd(lgd);
        } else {
                struct dt_object *obj = res->lr_lvb_data;
                /* release object reference */
@@ -395,7 +531,7 @@ int qmt_lvbo_free(struct lu_device *ld, struct ldlm_resource *res)
        RETURN(0);
 }
 
-typedef int (*qmt_glimpse_cb_t)(struct ldlm_lock *, void *);
+typedef int (*qmt_glimpse_cb_t)(struct ldlm_lock *, struct lquota_entry *);
 
 struct qmt_gl_lock_array {
        unsigned long             q_max;
@@ -481,6 +617,35 @@ again:
        RETURN(0);
 }
 
+void qmt_setup_id_desc(struct ldlm_lock *lock, union ldlm_gl_desc *desc,
+                      struct lquota_entry *lqe)
+{
+       struct obd_uuid *uuid = &(lock)->l_export->exp_client_uuid;
+       struct lqe_glbl_data *lgd = lqe->lqe_glbl_data;
+       int idx, stype;
+       __u64 qunit;
+       bool edquot;
+
+       stype = qmt_uuid2idx(uuid, &idx);
+       LASSERT(stype >= 0);
+
+       /* DOM case - set global lqe settings */
+       if (lqe_rtype(lqe) == LQUOTA_RES_DT && stype == QMT_STYPE_MDT) {
+               edquot = lqe->lqe_edquot;
+               qunit = lqe->lqe_qunit;
+       } else {
+               edquot = lgd->lqeg_arr[idx].lge_edquot;
+               qunit = lgd->lqeg_arr[idx].lge_qunit;
+       }
+
+       /* fill glimpse descriptor with lqe settings */
+       desc->lquota_desc.gl_flags = edquot ? LQUOTA_FL_EDQUOT : 0;
+       desc->lquota_desc.gl_qunit = qunit;
+       CDEBUG(D_QUOTA, "setup desc: stype %d idx %d, edquot %llu qunit %llu\n",
+                        stype, idx, desc->lquota_desc.gl_flags,
+                        desc->lquota_desc.gl_qunit);
+}
+
 /*
  * Send glimpse callback to slaves holding a lock on resource \res.
  * This is used to notify slaves of new quota settings or to claim quota space
@@ -496,22 +661,50 @@ again:
  */
 static int qmt_glimpse_lock(const struct lu_env *env, struct qmt_device *qmt,
                            struct ldlm_resource *res, union ldlm_gl_desc *desc,
-                           qmt_glimpse_cb_t cb, void *arg)
+                           qmt_glimpse_cb_t cb, struct lquota_entry *lqe)
 {
+       union ldlm_gl_desc *descs = NULL;
+       struct lqe_glbl_data *gld;
        struct list_head *tmp, *pos;
        LIST_HEAD(gl_list);
        struct qmt_gl_lock_array locks;
-       unsigned long i;
+       unsigned long i, locks_count;
        int rc = 0;
        ENTRY;
 
+       gld = lqe ? lqe->lqe_glbl_data : NULL;
        memset(&locks, 0, sizeof(locks));
-       rc = qmt_alloc_lock_array(res, &locks, cb, arg);
+       rc = qmt_alloc_lock_array(res, &locks, cb, lqe);
        if (rc) {
                CERROR("%s: failed to allocate glimpse lock array (%d)\n",
                       qmt->qmt_svname, rc);
                RETURN(rc);
        }
+       if (!locks.q_cnt) {
+               CDEBUG(D_QUOTA, "%s: no granted locks to send glimpse\n",
+                      qmt->qmt_svname);
+               RETURN(0);
+       }
+       CDEBUG(D_QUOTA, "found granted locks %lu\n", locks.q_cnt);
+       locks_count = locks.q_cnt;
+
+       /* Use one desc for all works, when called from qmt_glb_lock_notify */
+       if (gld && locks.q_cnt > 1) {
+               /* TODO: think about to store this preallocated descs
+                * in lqe_global in lqeg_arr as a part of lqe_glbl_entry.
+                * The benefit is that we don't need to allocate/free
+                * and setup this descs each time. But the drawback is
+                * memory use (sizeof ldlm_gl_desc * OST_COUNT * user_number).
+                * for examfple it could be 88 * 256 * 10 000 about 225 MB. */
+               OBD_ALLOC(descs,
+                         sizeof(struct ldlm_gl_lquota_desc) * locks.q_cnt);
+               if (!descs) {
+                       CERROR("%s: alloc glimpse lock array failed: rc = %d\n",
+                              qmt->qmt_svname, rc);
+                       qmt_free_lock_array(&locks);
+                       RETURN(-ENOMEM);
+               }
+       }
 
        for (i = locks.q_cnt; i > 0; i--) {
                struct ldlm_glimpse_work *work;
@@ -523,6 +716,13 @@ static int qmt_glimpse_lock(const struct lu_env *env, struct qmt_device *qmt,
                        continue;
                }
 
+               if (gld) {
+                       if (descs)
+                               desc = &descs[i - 1];
+                       qmt_setup_id_desc(locks.q_locks[i - 1], desc, lqe);
+                       work->gl_interpret_data = lqe;
+               }
+
                list_add_tail(&work->gl_list, &gl_list);
                work->gl_lock  = locks.q_locks[i - 1];
                work->gl_flags = 0;
@@ -536,7 +736,7 @@ static int qmt_glimpse_lock(const struct lu_env *env, struct qmt_device *qmt,
 
        if (list_empty(&gl_list)) {
                CDEBUG(D_QUOTA, "%s: nobody to notify\n", qmt->qmt_svname);
-               RETURN(0);
+               GOTO(out, rc = 0);
        }
 
        /* issue glimpse callbacks to all connected slaves */
@@ -554,6 +754,10 @@ static int qmt_glimpse_lock(const struct lu_env *env, struct qmt_device *qmt,
                LDLM_LOCK_RELEASE(work->gl_lock);
                OBD_FREE_PTR(work);
        }
+out:
+       if (descs)
+               OBD_FREE(descs,
+                        sizeof(struct ldlm_gl_lquota_desc) * locks_count);
 
        RETURN(rc);
 }
@@ -574,8 +778,7 @@ void qmt_glb_lock_notify(const struct lu_env *env, struct lquota_entry *lqe,
        struct ldlm_resource    *res = NULL;
        ENTRY;
 
-       lquota_generate_fid(&qti->qti_fid, pool->qpi_rtype,
-                           lqe->lqe_site->lqs_qtype);
+       lquota_generate_fid(&qti->qti_fid, pool->qpi_rtype, lqe_qtype(lqe));
 
        /* send glimpse callback to notify slaves of new quota settings */
        qti->qti_gl_desc.lquota_desc.gl_id        = lqe->lqe_id;
@@ -613,16 +816,25 @@ void qmt_glb_lock_notify(const struct lu_env *env, struct lquota_entry *lqe,
 
 /* Callback function used to select locks that should be glimpsed when
  * broadcasting the new qunit value */
-static int qmt_id_lock_cb(struct ldlm_lock *lock, void *arg)
+static int qmt_id_lock_cb(struct ldlm_lock *lock, struct lquota_entry *lqe)
 {
-       struct obd_uuid *slv_uuid = arg;
-       struct obd_uuid *uuid = &lock->l_export->exp_client_uuid;
+       struct obd_uuid *uuid = &(lock)->l_export->exp_client_uuid;
+       struct lqe_glbl_data *lgd = lqe->lqe_glbl_data;
+       int idx;
+       int stype = qmt_uuid2idx(uuid, &idx);
 
-       if (slv_uuid != NULL && obd_uuid_equals(uuid, slv_uuid))
-               RETURN(0);
-       RETURN(+1);
+       LASSERT(stype == QMT_STYPE_OST || stype == QMT_STYPE_MDT);
+
+       /* Quota pools support only OSTs, despite MDTs also could be registered
+        * as LQUOTA_RES_DT devices(DOM). */
+       if (lqe_rtype(lqe) == LQUOTA_RES_DT && stype == QMT_STYPE_MDT)
+               return 1;
+       else
+               return lgd->lqeg_arr[idx].lge_edquot_nu ||
+                      lgd->lqeg_arr[idx].lge_qunit_nu;
 }
 
+
 /*
  * Send glimpse request on per-ID lock to push new qunit value to slave.
  *
@@ -643,8 +855,7 @@ static void qmt_id_lock_glimpse(const struct lu_env *env,
        if (!lqe->lqe_enforced)
                RETURN_EXIT;
 
-       lquota_generate_fid(&qti->qti_fid, pool->qpi_rtype,
-                           lqe->lqe_site->lqs_qtype);
+       lquota_generate_fid(&qti->qti_fid, pool->qpi_rtype, lqe_qtype(lqe));
        fid_build_quota_res_name(&qti->qti_fid, &lqe->lqe_id, &qti->qti_resid);
        res = ldlm_resource_get(qmt->qmt_ns, NULL, &qti->qti_resid, LDLM_PLAIN,
                                0);
@@ -668,15 +879,14 @@ static void qmt_id_lock_glimpse(const struct lu_env *env,
         * - notify slaves that master ran out of quota space and there is no
         *   need to send acquire request any more until further notice */
 
-       /* fill glimpse descriptor with lqe settings */
-       if (lqe->lqe_edquot)
-               qti->qti_gl_desc.lquota_desc.gl_flags = LQUOTA_FL_EDQUOT;
-       else
-               qti->qti_gl_desc.lquota_desc.gl_flags = 0;
-       qti->qti_gl_desc.lquota_desc.gl_qunit = lqe->lqe_qunit;
-
+       /* TODO: it is not clear how to implement below case for all lqes
+        * from where slaves will be notified in qmt_glimpse_lock. Because
+        * here we have just global lqe with an array of OSTs that should
+        * be notified. Theoretically we can find all lqes that includes
+        * these OSTs, but it is not trivial. So I would propose to move
+        * this case to another place ... */
        if (lqe->lqe_revoke_time == 0 &&
-           qti->qti_gl_desc.lquota_desc.gl_qunit == pool->qpi_least_qunit)
+           lqe->lqe_qunit == pool->qpi_least_qunit)
                /* reset lqe_may_rel, it will be updated on glimpse callback
                 * replies if needed */
                lqe->lqe_may_rel = 0;
@@ -688,11 +898,10 @@ static void qmt_id_lock_glimpse(const struct lu_env *env,
 
        /* issue glimpse callback to slaves */
        qmt_glimpse_lock(env, qmt, res, &qti->qti_gl_desc,
-                        uuid ? qmt_id_lock_cb : NULL, (void *)uuid);
+                        qmt_id_lock_cb, lqe);
 
        lqe_write_lock(lqe);
        if (lqe->lqe_revoke_time == 0 &&
-           qti->qti_gl_desc.lquota_desc.gl_qunit == pool->qpi_least_qunit &&
            lqe->lqe_qunit == pool->qpi_least_qunit) {
                lqe->lqe_revoke_time = ktime_get_seconds();
                qmt_adjust_edquot(lqe, ktime_get_real_seconds());
@@ -717,6 +926,7 @@ void qmt_id_lock_notify(struct qmt_device *qmt, struct lquota_entry *lqe)
        bool    added = false;
        ENTRY;
 
+       LASSERT(lqe->lqe_is_global);
        lqe_getref(lqe);
        spin_lock(&qmt->qmt_reba_lock);
        if (!qmt->qmt_stopping && list_empty(&lqe->lqe_link)) {
index 71e5a52..93d2d44 100644 (file)
 #include <lprocfs_status.h>
 #include "qmt_internal.h"
 
-static void qmt_pool_free(const struct lu_env *, struct qmt_pool_info *);
+static inline int qmt_sarr_pool_init(struct qmt_pool_info *qpi);
+static inline int qmt_sarr_pool_add(struct qmt_pool_info *qpi,
+                                   int idx, int min);
+static inline int qmt_sarr_pool_rem(struct qmt_pool_info *qpi, int idx);
+static inline int qmt_sarr_pool_free(struct qmt_pool_info *qpi);
+static inline int qmt_sarr_check_idx(struct qmt_pool_info *qpi, int idx);
+static inline void qmt_stop_pool_recalc(struct qmt_pool_info *qpi);
 
 /*
  * Static helper functions not used outside the scope of this file
  */
 
-/*
- * Reference counter management for qmt_pool_info structures
- */
-static inline void qpi_getref(struct qmt_pool_info *pool)
-{
-       atomic_inc(&pool->qpi_ref);
-}
-
-static inline void qpi_putref(const struct lu_env *env,
-                             struct qmt_pool_info *pool)
-{
-       LASSERT(atomic_read(&pool->qpi_ref) > 0);
-       if (atomic_dec_and_test(&pool->qpi_ref))
-               qmt_pool_free(env, pool);
-}
-
 static inline void qpi_putref_locked(struct qmt_pool_info *pool)
 {
        LASSERT(atomic_read(&pool->qpi_ref) > 1);
@@ -105,7 +95,7 @@ static int qpi_state_seq_show(struct seq_file *m, void *data)
                           "        #slv: %d\n"
                           "        #lqe: %d\n",
                           qtype_name(type),
-                          pool->qpi_slv_nr[type],
+                          qpi_slv_nr(pool, type),
                    atomic_read(&pool->qpi_site[type]->lqs_hash->hs_count));
 
        return 0;
@@ -184,6 +174,9 @@ static int qmt_pool_alloc(const struct lu_env *env, struct qmt_device *qmt,
        if (pool == NULL)
                RETURN(-ENOMEM);
        INIT_LIST_HEAD(&pool->qpi_linkage);
+       init_waitqueue_head(&pool->qpi_recalc_thread.t_ctl_waitq);
+       thread_set_flags(&pool->qpi_recalc_thread, SVC_STOPPED);
+       init_rwsem(&pool->qpi_recalc_sem);
 
        pool->qpi_rtype = pool_type;
 
@@ -217,10 +210,14 @@ static int qmt_pool_alloc(const struct lu_env *env, struct qmt_device *qmt,
                GOTO(out, rc);
        }
 
+       rc = qmt_sarr_pool_init(pool);
+       if (rc)
+               GOTO(out, rc);
+
        /* add to qmt pool list */
-       write_lock(&qmt->qmt_pool_lock);
+       down_write(&qmt->qmt_pool_lock);
        list_add_tail(&pool->qpi_linkage, &qmt->qmt_pool_list);
-       write_unlock(&qmt->qmt_pool_lock);
+       up_write(&qmt->qmt_pool_lock);
        EXIT;
 out:
        if (rc)
@@ -235,20 +232,23 @@ out:
  * \param env  - is the environment passed by the caller
  * \param pool - is the qmt_pool_info structure to free
  */
-static void qmt_pool_free(const struct lu_env *env, struct qmt_pool_info *pool)
+void qmt_pool_free(const struct lu_env *env, struct qmt_pool_info *pool)
 {
        struct  qmt_device *qmt = pool->qpi_qmt;
        int     qtype;
        ENTRY;
 
        /* remove from list */
-       write_lock(&qmt->qmt_pool_lock);
+       down_write(&qmt->qmt_pool_lock);
        list_del_init(&pool->qpi_linkage);
-       write_unlock(&qmt->qmt_pool_lock);
+       up_write(&qmt->qmt_pool_lock);
 
        if (atomic_read(&pool->qpi_ref) > 0)
                RETURN_EXIT;
 
+       qmt_stop_pool_recalc(pool);
+       qmt_sarr_pool_free(pool);
+
        /* release proc entry */
        if (pool->qpi_proc) {
                lprocfs_remove(&pool->qpi_proc);
@@ -280,8 +280,8 @@ static void qmt_pool_free(const struct lu_env *env, struct qmt_pool_info *pool)
        if (pool->qpi_qmt != NULL) {
                struct lu_device *ld = qmt2lu_dev(pool->qpi_qmt);
 
-               lu_device_put(ld);
                lu_ref_del(&ld->ld_reference, "pool", pool);
+               lu_device_put(ld);
                pool->qpi_qmt = NULL;
        }
 
@@ -289,42 +289,155 @@ static void qmt_pool_free(const struct lu_env *env, struct qmt_pool_info *pool)
        OBD_FREE_PTR(pool);
 }
 
+static inline void qti_pools_init(const struct lu_env *env)
+{
+       struct qmt_thread_info  *qti = qmt_info(env);
+
+       qti->qti_pools_cnt = 0;
+       qti->qti_pools_num = QMT_MAX_POOL_NUM;
+}
+
+#define qti_pools(qti) (qti->qti_pools_num > QMT_MAX_POOL_NUM ? \
+                               qti->qti_pools : qti->qti_pools_small)
+#define qti_pools_env(env) \
+       (qmt_info(env)->qti_pools_num > QMT_MAX_POOL_NUM ? \
+               qmt_info(env)->qti_pools : qmt_info(env)->qti_pools_small)
+#define qti_pools_cnt(env)     (qmt_info(env)->qti_pools_cnt)
+
+static inline int qti_pools_add(const struct lu_env *env,
+                               struct qmt_pool_info *qpi)
+{
+       struct qmt_thread_info  *qti = qmt_info(env);
+       struct qmt_pool_info    **pools = qti->qti_pools;
+
+       pools = qti_pools(qti);
+       LASSERTF(qti->qti_pools_num >= QMT_MAX_POOL_NUM,
+                "Forgot init? %p\n", qti);
+
+       if (qti->qti_pools_cnt > qti->qti_pools_num) {
+               OBD_ALLOC(pools, sizeof(qpi) * qti->qti_pools_num * 2);
+               if (!pools)
+                       return -ENOMEM;
+               memcpy(pools, qti_pools(qti), qti->qti_pools_cnt * sizeof(qpi));
+               /* Don't need to free, if it is the very 1st allocation */
+               if (qti->qti_pools_num > QMT_MAX_POOL_NUM)
+                       OBD_FREE(qti->qti_pools,
+                                qti->qti_pools_num * sizeof(qpi));
+               qti->qti_pools = pools;
+               qti->qti_pools_num *= 2;
+       }
+
+       qpi_getref(qpi);
+       /* Take this to protect pool's lqes against changing by
+        * recalculation thread. This would be unlocked at
+        * qti_pools_fini. */
+       down_read(&qpi->qpi_recalc_sem);
+       if (qmt_pool_global(qpi) && qti_pools_cnt(env) > 0) {
+               pools[qti->qti_pools_cnt++] = pools[0];
+               /* Store global pool always at index 0 */
+               pools[0] = qpi;
+       } else {
+               pools[qti->qti_pools_cnt++] = qpi;
+       }
+
+       CDEBUG(D_QUOTA, "Pool %s is added, pools %p qti_pools %p pool_num %d\n",
+              qpi->qpi_name, pools, qti->qti_pools, qti->qti_pools_cnt);
+
+       return 0;
+}
+
+static inline void qti_pools_fini(const struct lu_env *env)
+{
+       struct qmt_thread_info  *qti = qmt_info(env);
+       struct qmt_pool_info    **pools = qti->qti_pools;
+       int i;
+
+       LASSERT(qti->qti_pools_cnt > 0);
+
+       pools = qti_pools(qti);
+       for (i = 0; i < qti->qti_pools_cnt; i++) {
+               up_read(&pools[i]->qpi_recalc_sem);
+               qpi_putref(env, pools[i]);
+       }
+
+       if (qti->qti_pools_num > QMT_MAX_POOL_NUM)
+               OBD_FREE(qti->qti_pools,
+                        qti->qti_pools_num * sizeof(struct qmt_pool_info *));
+}
+
 /*
  * Look-up a pool in a list based on the type.
  *
- * \param env     - is the environment passed by the caller
- * \param qmt     - is the quota master target
- * \param pool_type - is the type of this pool, either LQUOTA_RES_MD or
+ * \param env  - is the environment passed by the caller
+ * \param qmt  - is the quota master target
+ * \param rtype - is the type of this pool, either LQUOTA_RES_MD or
  *                    LQUOTA_RES_DT.
+ * \param pool_name - is the pool name to search for
+ * \param idx  - OST or MDT index to search for. When it is >= 0, function
+ *             returns array with pointers to all pools that include
+ *             targets with requested index.
+ * \param add  - add to qti_pool_arr if true
  */
-static struct qmt_pool_info *qmt_pool_lookup(const struct lu_env *env,
+struct qmt_pool_info *qmt_pool_lookup(const struct lu_env *env,
                                             struct qmt_device *qmt,
-                                            int pool_type)
+                                            int rtype,
+                                            char *pool_name,
+                                            int idx, bool add)
 {
        struct qmt_pool_info    *pos, *pool;
+       int rc;
        ENTRY;
 
-       read_lock(&qmt->qmt_pool_lock);
+       down_read(&qmt->qmt_pool_lock);
        if (list_empty(&qmt->qmt_pool_list)) {
-               read_unlock(&qmt->qmt_pool_lock);
+               up_read(&qmt->qmt_pool_lock);
                RETURN(ERR_PTR(-ENOENT));
        }
 
+       CDEBUG(D_QUOTA, "type %d name %p index %d\n",
+              rtype, pool_name, idx);
        /* Now just find a pool with correct type in a list. Further we need
         * to go through the list and find a pool that includes requested OST
         * or MDT. Possibly this would return a list of pools that includes
         * needed target(OST/MDT). */
        pool = NULL;
+       if (idx == -1 && !pool_name)
+               pool_name = GLB_POOL_NAME;
+
        list_for_each_entry(pos, &qmt->qmt_pool_list, qpi_linkage) {
-               if (pos->qpi_rtype == pool_type) {
+               if (pos->qpi_rtype != rtype)
+                       continue;
+
+               if (idx >= 0 && !qmt_sarr_check_idx(pos, idx)) {
+                       rc = qti_pools_add(env, pos);
+                       if (rc)
+                               GOTO(out_err, rc);
+                       continue;
+               }
+
+               if (pool_name && !strncmp(pool_name, pos->qpi_name,
+                                         LOV_MAXPOOLNAME)) {
                        pool = pos;
-                       qpi_getref(pool);
+                       if (add) {
+                               rc = qti_pools_add(env, pos);
+                               if (rc)
+                                       GOTO(out_err, rc);
+                       } else {
+                               qpi_getref(pool);
+                       }
                        break;
                }
        }
-       read_unlock(&qmt->qmt_pool_lock);
+       up_read(&qmt->qmt_pool_lock);
 
-       RETURN(pool);
+       if (idx >= 0 && qti_pools_cnt(env))
+               pool = qti_pools_env(env)[0];
+
+       RETURN(pool ? : ERR_PTR(-ENOENT));
+out_err:
+       CERROR("%s: cannot add pool %s: err = %d\n",
+               qmt->qmt_svname, pos->qpi_name, rc);
+       RETURN(ERR_PTR(rc));
 }
 
 /*
@@ -370,13 +483,13 @@ int qmt_pool_init(const struct lu_env *env, struct qmt_device *qmt)
        ENTRY;
 
        INIT_LIST_HEAD(&qmt->qmt_pool_list);
-       rwlock_init(&qmt->qmt_pool_lock);
+       init_rwsem(&qmt->qmt_pool_lock);
 
        /* Instantiate pool master for the default data and metadata pool.
         * This code will have to be revisited once we support quota on
         * non-default pools */
        for (res = LQUOTA_FIRST_RES; res < LQUOTA_LAST_RES; res++) {
-               rc = qmt_pool_alloc(env, qmt, "0x0", res);
+               rc = qmt_pool_alloc(env, qmt, GLB_POOL_NAME, res);
                if (rc)
                        break;
        }
@@ -390,10 +503,22 @@ int qmt_pool_init(const struct lu_env *env, struct qmt_device *qmt)
 static int qmt_slv_cnt(const struct lu_env *env, struct lu_fid *glb_fid,
                       char *slv_name, struct lu_fid *slv_fid, void *arg)
 {
-       int *nr = arg;
-
+       struct obd_uuid uuid;
+       int (*nr)[QMT_STYPE_CNT][LL_MAXQUOTAS] = arg;
+       int stype, qtype;
+       int rc;
+
+       rc = lquota_extract_fid(glb_fid, NULL, &qtype);
+       LASSERT(!rc);
+
+       obd_str2uuid(&uuid, slv_name);
+       stype = qmt_uuid2idx(&uuid, NULL);
+       if (stype < 0)
+               return stype;
        /* one more slave */
-       (*nr)++;
+       (*nr)[stype][qtype]++;
+       CDEBUG(D_QUOTA, "slv_name %s stype %d qtype %d nr %d\n",
+                       slv_name, stype, qtype, (*nr)[stype][qtype]);
 
        return 0;
 }
@@ -405,11 +530,13 @@ static int qmt_slv_cnt(const struct lu_env *env, struct lu_fid *glb_fid,
  * \param qmt - is the quota master target for which we have to initialize the
  *              pool configuration
  * \param qmt_root - is the on-disk directory created for the QMT.
+ * \param name - is the pool name that we need to setup. Setup all pools
+ *              in qmt_pool_list when name is NULL.
  *
  * \retval - 0 on success, appropriate error on failure
  */
 int qmt_pool_prepare(const struct lu_env *env, struct qmt_device *qmt,
-                    struct dt_object *qmt_root)
+                    struct dt_object *qmt_root, char *name)
 {
        struct qmt_thread_info  *qti = qmt_info(env);
        struct lquota_glb_rec   *rec = &qti->qti_glb_rec;
@@ -417,7 +544,7 @@ int qmt_pool_prepare(const struct lu_env *env, struct qmt_device *qmt,
        struct dt_device        *dev = NULL;
        dt_obj_version_t         version;
        struct list_head        *pos;
-       int                      rc = 0, qtype;
+       int                      rc = 0, i, qtype;
        ENTRY;
 
        /* iterate over each pool in the list and allocate a quota site for each
@@ -426,19 +553,21 @@ int qmt_pool_prepare(const struct lu_env *env, struct qmt_device *qmt,
                struct dt_object        *obj;
                struct lquota_entry     *lqe;
                char                    *pool_name;
-               int                      pool_type;
+               int                      rtype;
 
                pool = list_entry(pos, struct qmt_pool_info,
                                  qpi_linkage);
 
                pool_name = pool->qpi_name;
-               pool_type = pool->qpi_rtype;
+               if (name && strncmp(pool_name, name, LOV_MAXPOOLNAME))
+                       continue;
+               rtype = pool->qpi_rtype;
                if (dev == NULL)
                        dev = pool->qpi_qmt->qmt_child;
 
                /* allocate directory for this pool */
                snprintf(qti->qti_buf, LQUOTA_NAME_MAX, "%s-%s",
-                        RES_NAME(pool_type), pool_name);
+                        RES_NAME(rtype), pool_name);
                obj = lquota_disk_dir_find_create(env, qmt->qmt_child, qmt_root,
                                                  qti->qti_buf);
                if (IS_ERR(obj))
@@ -448,13 +577,16 @@ int qmt_pool_prepare(const struct lu_env *env, struct qmt_device *qmt,
                for (qtype = 0; qtype < LL_MAXQUOTAS; qtype++) {
                        /* Generating FID of global index in charge of storing
                         * settings for this quota type */
-                       lquota_generate_fid(&qti->qti_fid, pool_type, qtype);
+                       lquota_generate_fid(&qti->qti_fid, rtype, qtype);
 
                        /* open/create the global index file for this quota
-                        * type */
+                        * type. If name is set, it means we came here from
+                        * qmt_pool_new and can create glb index with a
+                        * local generated FID. */
                        obj = lquota_disk_glb_find_create(env, dev,
                                                          pool->qpi_root,
-                                                         &qti->qti_fid, false);
+                                                         &qti->qti_fid,
+                                                         name ? true : false);
                        if (IS_ERR(obj)) {
                                rc = PTR_ERR(obj);
                                CERROR("%s: failed to create glb index copy for %s type: rc = %d\n",
@@ -470,7 +602,7 @@ int qmt_pool_prepare(const struct lu_env *env, struct qmt_device *qmt,
                                rec->qbr_hardlimit = 0;
                                rec->qbr_softlimit = 0;
                                rec->qbr_granted = 0;
-                               rec->qbr_time = pool_type == LQUOTA_RES_MD ?
+                               rec->qbr_time = rtype == LQUOTA_RES_MD ?
                                        MAX_IQ_TIME : MAX_DQ_TIME;
 
                                rc = lquota_disk_write_glb(env, obj, 0, rec);
@@ -501,11 +633,13 @@ int qmt_pool_prepare(const struct lu_env *env, struct qmt_device *qmt,
 
                        /* count number of slaves which already connected to
                         * the master in the past */
-                       pool->qpi_slv_nr[qtype] = 0;
+                       for (i = 0; i < QMT_STYPE_CNT; i++)
+                               pool->qpi_slv_nr[i][qtype] = 0;
+
                        rc = lquota_disk_for_each_slv(env, pool->qpi_root,
                                                      &qti->qti_fid,
                                                      qmt_slv_cnt,
-                                                     &pool->qpi_slv_nr[qtype]);
+                                                     &pool->qpi_slv_nr);
                        if (rc) {
                                CERROR("%s: failed to scan & count slave indexes for %s type: rc = %d\n",
                                       qmt->qmt_svname, qtype_name(qtype), rc);
@@ -535,6 +669,8 @@ int qmt_pool_prepare(const struct lu_env *env, struct qmt_device *qmt,
                                      qmt->qmt_svname, PFID(&qti->qti_fid), rc);
 #endif
                }
+               if (name)
+                       break;
        }
 
        RETURN(0);
@@ -560,9 +696,13 @@ int qmt_pool_new_conn(const struct lu_env *env, struct qmt_device *qmt,
 {
        struct qmt_pool_info    *pool;
        struct dt_object        *slv_obj;
-       int                      pool_type, qtype;
+       int                      pool_type, qtype, stype;
        bool                     created = false;
-       int                      rc = 0;
+       int                      idx, i, rc = 0;
+
+       stype = qmt_uuid2idx(uuid, &idx);
+       if (stype < 0)
+               RETURN(stype);
 
        /* extract pool info from global index FID */
        rc = lquota_extract_fid(glb_fid, &pool_type, &qtype);
@@ -570,7 +710,8 @@ int qmt_pool_new_conn(const struct lu_env *env, struct qmt_device *qmt,
                RETURN(rc);
 
        /* look-up pool in charge of this global index FID */
-       pool = qmt_pool_lookup(env, qmt, pool_type);
+       qti_pools_init(env);
+       pool = qmt_pool_lookup_arr(env, qmt, pool_type, idx);
        if (IS_ERR(pool))
                RETURN(PTR_ERR(pool));
 
@@ -596,9 +737,10 @@ int qmt_pool_new_conn(const struct lu_env *env, struct qmt_device *qmt,
        *slv_ver = dt_version_get(env, slv_obj);
        dt_object_put(env, slv_obj);
        if (created)
-               pool->qpi_slv_nr[qtype]++;
+               for (i = 0; i < qti_pools_cnt(env); i++)
+                       qti_pools_env(env)[i]->qpi_slv_nr[stype][qtype]++;
 out:
-       qpi_putref(env, pool);
+       qti_pools_fini(env);
        RETURN(rc);
 }
 
@@ -618,16 +760,17 @@ out:
 struct lquota_entry *qmt_pool_lqe_lookup(const struct lu_env *env,
                                         struct qmt_device *qmt,
                                         int pool_type, int qtype,
-                                        union lquota_id *qid)
+                                        union lquota_id *qid,
+                                        char *pool_name)
 {
        struct qmt_pool_info    *pool;
        struct lquota_entry     *lqe;
        ENTRY;
 
        /* look-up pool responsible for this global index FID */
-       pool = qmt_pool_lookup(env, qmt, pool_type);
+       pool = qmt_pool_lookup_name(env, qmt, pool_type, pool_name);
        if (IS_ERR(pool))
-               RETURN((void *)pool);
+               RETURN(ERR_CAST(pool));
 
        if (qid->qid_uid == 0) {
                /* caller wants to access grace time, no need to look up the
@@ -644,3 +787,825 @@ out:
        qpi_putref(env, pool);
        RETURN(lqe);
 }
+
+int qmt_pool_lqes_lookup(const struct lu_env *env,
+                        struct qmt_device *qmt,
+                        int rtype, int stype,
+                        int qtype, union lquota_id *qid,
+                        char *pool_name, int idx)
+{
+       struct qmt_pool_info    *pool;
+       struct lquota_entry     *lqe;
+       int rc, i;
+       ENTRY;
+
+       /* Until MDT pools are not emplemented, all MDTs belong to
+        * global pool, thus lookup lqes only from global pool. */
+       if (rtype == LQUOTA_RES_DT && stype == QMT_STYPE_MDT)
+               idx = -1;
+
+       qti_pools_init(env);
+       rc = 0;
+       /* look-up pool responsible for this global index FID */
+       pool = qmt_pool_lookup_arr(env, qmt, rtype, idx);
+       if (IS_ERR(pool)) {
+               qti_pools_fini(env);
+               RETURN(PTR_ERR(pool));
+       }
+
+       /* now that we have the pool, let's look-up the quota entry in the
+        * right quota site */
+       qti_lqes_init(env);
+       for (i = 0; i < qti_pools_cnt(env); i++) {
+               pool = qti_pools_env(env)[i];
+               lqe = lqe_locate(env, pool->qpi_site[qtype], qid);
+               if (IS_ERR(lqe)) {
+                       qti_lqes_fini(env);
+                       GOTO(out, rc = PTR_ERR(lqe));
+               }
+               /* Only release could be done for not enforced lqe
+                * (see qmt_dqacq0). However slave could request to
+                * release more than not global lqe had granted before
+                * lqe_enforced was cleared. It is legal case,
+                * because even if current lqe is not enforced,
+                * lqes from other pools are still active and avilable
+                * for acquiring. Furthermore, skip not enforced lqe
+                * to don't make extra allocations. */
+               /*if (!lqe_is_glbl(lqe) && !lqe->lqe_enforced) {
+                       lqe_putref(lqe);
+                       continue;
+               }*/
+               qti_lqes_add(env, lqe);
+       }
+       LASSERT(qti_lqes_glbl(env)->lqe_is_global);
+
+out:
+       qti_pools_fini(env);
+       RETURN(rc);
+}
+
+static int lqes_cmp(const void *arg1, const void *arg2)
+{
+       const struct lquota_entry *lqe1, *lqe2;
+       lqe1 = arg1;
+       lqe2 = arg2;
+       return lqe1->lqe_qunit > lqe2->lqe_qunit;
+}
+
+void qmt_lqes_sort(const struct lu_env *env)
+{
+       sort(qti_lqes(env), qti_lqes_cnt(env), sizeof(void *), lqes_cmp, NULL);
+       /* global lqe was moved during sorting */
+       if (!qti_lqes_glbl(env)->lqe_is_global) {
+               int i;
+               for (i = 0; i < qti_lqes_cnt(env); i++) {
+                       if (qti_lqes(env)[i]->lqe_is_global) {
+                               qti_glbl_lqe_idx(env) = i;
+                               break;
+                       }
+               }
+       }
+}
+
+int qmt_pool_lqes_lookup_spec(const struct lu_env *env, struct qmt_device *qmt,
+                             int rtype, int qtype, union lquota_id *qid)
+{
+       struct qmt_pool_info    *pos;
+       struct lquota_entry     *lqe;
+       int rc = 0;
+
+       qti_lqes_init(env);
+       down_read(&qmt->qmt_pool_lock);
+       if (list_empty(&qmt->qmt_pool_list)) {
+               up_read(&qmt->qmt_pool_lock);
+               RETURN(-ENOENT);
+       }
+
+       list_for_each_entry(pos, &qmt->qmt_pool_list, qpi_linkage) {
+               if (pos->qpi_rtype != rtype)
+                       continue;
+               /* Don't take into account pools without slaves */
+               if (!qpi_slv_nr(pos, qtype))
+                       continue;
+               lqe = lqe_find(env, pos->qpi_site[qtype], qid);
+               /* ENOENT is valid case for lqe from non global pool
+                * that hasn't limits, i.e. not enforced. Continue even
+                * in case of error - we can handle already found lqes */
+               if (IS_ERR_OR_NULL(lqe)) {
+                       /* let know that something went wrong */
+                       rc = lqe ? PTR_ERR(lqe) : -ENOENT;
+                       continue;
+               }
+               if (!lqe->lqe_enforced) {
+                       /* no settings for this qid_uid */
+                       lqe_putref(lqe);
+                       continue;
+               }
+               qti_lqes_add(env, lqe);
+               CDEBUG(D_QUOTA, "adding lqe %p from pool %s\n",
+                                lqe, pos->qpi_name);
+       }
+       up_read(&qmt->qmt_pool_lock);
+       RETURN(rc);
+}
+
+/**
+ * Allocate a new pool for the specified device.
+ *
+ * Allocate a new pool_desc structure for the specified \a new_pool
+ * device to create a pool with the given \a poolname.  The new pool
+ * structure is created with a single reference, and is freed when the
+ * reference count drops to zero.
+ *
+ * \param[in] obd      Lustre OBD device on which to add a pool iterator
+ * \param[in] poolname the name of the pool to be created
+ *
+ * \retval             0 in case of success
+ * \retval             negative error code in case of error
+ */
+int qmt_pool_new(struct obd_device *obd, char *poolname)
+{
+       struct qmt_device       *qmt = lu2qmt_dev(obd->obd_lu_dev);
+       struct qmt_pool_info *qpi;
+       struct lu_env env;
+       int rc;
+       ENTRY;
+
+       if (strnlen(poolname, LOV_MAXPOOLNAME + 1) > LOV_MAXPOOLNAME)
+               RETURN(-ENAMETOOLONG);
+
+       rc = lu_env_init(&env, LCT_MD_THREAD);
+       if (rc) {
+               CERROR("%s: can't init env: rc = %d\n", obd->obd_name, rc);
+               RETURN(rc);
+       }
+
+       qpi = qmt_pool_lookup_name(&env, qmt, LQUOTA_RES_DT, poolname);
+       if (!IS_ERR(qpi)) {
+               /* Valid case when several MDTs are mounted
+                * at the same node. */
+               CDEBUG(D_QUOTA, "pool %s already exists\n", poolname);
+               qpi_putref(&env, qpi);
+               GOTO(out_env, rc = -EEXIST);
+       }
+       if (PTR_ERR(qpi) != -ENOENT) {
+               CWARN("%s: pool %s lookup failed: rc = %ld\n",
+                     obd->obd_name, poolname, PTR_ERR(qpi));
+               GOTO(out_env, rc = PTR_ERR(qpi));
+       }
+
+       /* Now allocate and prepare only DATA pool.
+        * Further when MDT pools will be ready we need to add
+        * a cycle here and setup pools of both types. Another
+        * approach is to find out pool of which type should be
+        * created. */
+       rc = qmt_pool_alloc(&env, qmt, poolname, LQUOTA_RES_DT);
+       if (rc) {
+               CERROR("%s: can't alloc pool %s: rc = %d\n",
+                      obd->obd_name, poolname, rc);
+               GOTO(out_env, rc);
+       }
+
+       rc = qmt_pool_prepare(&env, qmt, qmt->qmt_root, poolname);
+       if (rc) {
+               CERROR("%s: can't prepare pool for %s: rc = %d\n",
+                      obd->obd_name, poolname, rc);
+               GOTO(out_err, rc);
+       }
+
+       CDEBUG(D_QUOTA, "Quota pool "LOV_POOLNAMEF" added\n",
+              poolname);
+
+       GOTO(out_env, rc);
+out_err:
+       qpi = qmt_pool_lookup_name(&env, qmt, LQUOTA_RES_DT, poolname);
+       if (!IS_ERR(qpi)) {
+               qpi_putref(&env, qpi);
+               qpi_putref(&env, qpi);
+       }
+out_env:
+       lu_env_fini(&env);
+       return rc;
+}
+
+static int
+qmt_obj_recalc(const struct lu_env *env, struct dt_object *obj,
+              struct ptlrpc_thread *thread, struct lquota_site *site)
+{
+       struct qmt_thread_info *qti = qmt_info(env);
+       union lquota_id *qid = &qti->qti_id;
+       const struct dt_it_ops *iops;
+       struct dt_key *key;
+       struct dt_it *it;
+       __u64 granted;
+       int rc;
+       ENTRY;
+
+       iops = &obj->do_index_ops->dio_it;
+
+       it = iops->init(env, obj, 0);
+       if (IS_ERR(it)) {
+               CWARN("quota: initialize it for "DFID" failed: rc = %ld\n",
+                     PFID(&qti->qti_fid), PTR_ERR(it));
+               RETURN(PTR_ERR(it));
+       }
+
+       rc = iops->load(env, it, 0);
+       if (rc < 0) {
+               CWARN("quota: load first entry for "DFID" failed: rc = %d\n",
+                     PFID(&qti->qti_fid), rc);
+               GOTO(out, rc);
+       } else if (rc == 0) {
+               rc = iops->next(env, it);
+               if (rc != 0)
+                       GOTO(out, rc = (rc < 0) ? rc : 0);
+       }
+
+       do {
+               struct lquota_entry *lqe;
+
+               key = iops->key(env, it);
+               if (IS_ERR(key)) {
+                       CWARN("quota: error key for "DFID": rc = %ld\n",
+                             PFID(&qti->qti_fid), PTR_ERR(key));
+                       GOTO(out, rc = PTR_ERR(key));
+               }
+
+               /* skip the root user/group */
+               if (*((__u64 *)key) == 0)
+                       goto next;
+
+               qid->qid_uid = *((__u64 *)key);
+
+               rc = qmt_slv_read(env, qid, obj, &granted);
+               if (!granted)
+                       goto next;
+
+               lqe = lqe_locate(env, site, qid);
+               if (IS_ERR(lqe))
+                       GOTO(out, rc = PTR_ERR(lqe));
+               lqe_write_lock(lqe);
+               lqe->lqe_recalc_granted += granted;
+               lqe_write_unlock(lqe);
+               lqe_putref(lqe);
+next:
+               rc = iops->next(env, it);
+               if (rc < 0)
+                       CWARN("quota: failed to parse index "DFID
+                             ", ->next error: rc = %d\n",
+                             PFID(&qti->qti_fid), rc);
+       } while (rc == 0 && thread_is_running(thread));
+
+out:
+       iops->put(env, it);
+       iops->fini(env, it);
+       RETURN(rc);
+}
+
+static int qmt_site_recalc_cb(struct cfs_hash *hs, struct cfs_hash_bd *bd,
+                             struct hlist_node *hnode, void *data)
+{
+       struct lquota_entry     *lqe;
+       struct lu_env *env = data;
+
+       lqe = hlist_entry(hnode, struct lquota_entry, lqe_hash);
+       LASSERT(atomic_read(&lqe->lqe_ref) > 0);
+
+       lqe_write_lock(lqe);
+       if (lqe->lqe_granted != lqe->lqe_recalc_granted) {
+               struct qmt_device *qmt = lqe2qpi(lqe)->qpi_qmt;
+               struct thandle *th;
+               bool need_notify = false;
+               int rc;
+
+               LQUOTA_DEBUG(lqe, "lqe_recalc_granted %llu\n",
+                            lqe->lqe_recalc_granted);
+               lqe->lqe_granted = lqe->lqe_recalc_granted;
+               /* Always returns true, if there is no slaves in a pool */
+               need_notify |= qmt_adjust_qunit(env, lqe);
+               need_notify |= qmt_adjust_edquot(lqe, ktime_get_real_seconds());
+               if (need_notify) {
+                       /* Find all lqes with lqe_id to reseed lgd array */
+                       rc = qmt_pool_lqes_lookup_spec(env, qmt, lqe_rtype(lqe),
+                                               lqe_qtype(lqe), &lqe->lqe_id);
+                       if (!rc && qti_lqes_glbl(env)->lqe_glbl_data) {
+                               qmt_seed_glbe(env,
+                                       qti_lqes_glbl(env)->lqe_glbl_data);
+                               qmt_id_lock_notify(qmt, qti_lqes_glbl(env));
+                       }
+                       qti_lqes_fini(env);
+               }
+               th = dt_trans_create(env, qmt->qmt_child);
+               if (IS_ERR(th))
+                       goto out;
+
+               rc = lquota_disk_declare_write(env, th,
+                                              LQE_GLB_OBJ(lqe),
+                                              &lqe->lqe_id);
+               if (rc)
+                       GOTO(out_stop, rc);
+
+               rc = dt_trans_start_local(env, qmt->qmt_child, th);
+               if (rc)
+                       GOTO(out_stop, rc);
+
+               qmt_glb_write(env, th, lqe, 0, NULL);
+out_stop:
+               dt_trans_stop(env, qmt->qmt_child, th);
+       }
+out:
+       lqe->lqe_recalc_granted = 0;
+       lqe_write_unlock(lqe);
+
+       return 0;
+}
+
+#define MDT_DEV_NAME_LEN (LUSTRE_MAXFSNAME + sizeof("-MDT0000"))
+static struct obd_device *qmt_get_mgc(struct qmt_device *qmt)
+{
+       char mdt_name[MDT_DEV_NAME_LEN];
+       struct lustre_mount_info *lmi;
+       struct obd_device *obd;
+       int rc;
+       ENTRY;
+
+       rc = server_name2fsname(qmt->qmt_svname, mdt_name, NULL);
+       if (rc) {
+               CERROR("quota: cannot get server name from %s: rc = %d\n",
+                      qmt->qmt_svname, rc);
+               RETURN(ERR_PTR(rc));
+       }
+
+       strlcat(mdt_name, "-MDT0000", MDT_DEV_NAME_LEN);
+       lmi = server_get_mount(mdt_name);
+       if (lmi == NULL) {
+               rc = -ENOENT;
+               CERROR("%s: cannot get mount info from %s: rc = %d\n",
+                      qmt->qmt_svname, mdt_name, rc);
+               RETURN(ERR_PTR(rc));
+       }
+       obd = s2lsi(lmi->lmi_sb)->lsi_mgc;
+       lustre_put_lsi(lmi->lmi_sb);
+
+       RETURN(obd);
+}
+
+static int qmt_pool_recalc(void *args)
+{
+       struct qmt_pool_info *pool, *glbl_pool;
+       struct rw_semaphore *sem = NULL;
+       struct obd_device *obd;
+       struct lu_env env;
+       int i, rc, qtype, slaves_cnt;
+       ENTRY;
+
+       pool = args;
+       thread_set_flags(&pool->qpi_recalc_thread, SVC_RUNNING);
+
+       obd = qmt_get_mgc(pool->qpi_qmt);
+       if (IS_ERR(obd))
+               GOTO(out, rc = PTR_ERR(obd));
+       else
+               /* Waiting for the end of processing mgs config.
+                * It is needed to be sure all pools are configured. */
+               while (obd->obd_process_conf)
+                       schedule_timeout_uninterruptible(cfs_time_seconds(1));
+
+       sem = qmt_sarr_rwsem(pool);
+       LASSERT(sem);
+       down_read(sem);
+       /* Hold this to be sure that OSTs from this pool
+        * can't do acquire/release.
+        *
+        * I guess below write semaphore could be a bottleneck
+        * as qmt_dqacq would be blocked trying to hold
+        * read_lock at qmt_pool_lookup->qti_pools_add.
+        * But on the other hand adding/removing OSTs to the pool is
+        * a rare operation. If finally this would be a problem,
+        * we can consider another approach. For example we can
+        * iterate through the POOL's lqes. Take lqe, hold lqe_write_lock
+        * and go through appropriate OSTs. I don't use this approach now
+        * as newly created pool hasn't lqes entries. So firstly we need
+        * to get this lqes from the global pool index file. This
+        * solution looks more complex, so leave it as it is. */
+       down_write(&pool->qpi_recalc_sem);
+
+       rc = lu_env_init(&env, LCT_MD_THREAD);
+       if (rc) {
+               CERROR("%s: cannot init env: rc = %d\n", obd->obd_name, rc);
+               GOTO(out, rc);
+       }
+
+       glbl_pool = qmt_pool_lookup_glb(&env, pool->qpi_qmt, pool->qpi_rtype);
+       if (IS_ERR(glbl_pool))
+               GOTO(out_env, rc = PTR_ERR(glbl_pool));
+
+       slaves_cnt = qmt_sarr_count(pool);
+       CDEBUG(D_QUOTA, "Starting pool recalculation for %d slaves in %s\n",
+              slaves_cnt, pool->qpi_name);
+
+       for (qtype = 0; qtype < LL_MAXQUOTAS; qtype++) {
+               for (i = 0; i < slaves_cnt; i++) {
+                       struct qmt_thread_info  *qti = qmt_info(&env);
+                       struct dt_object *slv_obj;
+                       struct obd_uuid uuid;
+                       int idx;
+
+                       if (thread_is_stopping(&pool->qpi_recalc_thread))
+                               GOTO(out_stop, rc = 0);
+                       idx = qmt_sarr_get_idx(pool, i);
+                       LASSERT(idx >= 0);
+
+                       /* We don't need fsname here - anyway
+                        * lquota_disk_slv_filename ignores it. */
+                       snprintf(uuid.uuid, UUID_MAX, "-OST%04x_UUID", idx);
+                       lquota_generate_fid(&qti->qti_fid, pool->qpi_rtype,
+                                           qtype);
+                       /* look-up index file associated with acquiring slave */
+                       slv_obj = lquota_disk_slv_find(&env,
+                                               glbl_pool->qpi_qmt->qmt_child,
+                                               glbl_pool->qpi_root,
+                                               &qti->qti_fid,
+                                               &uuid);
+                       if (IS_ERR(slv_obj))
+                               GOTO(out_stop, rc = PTR_ERR(slv_obj));
+
+                       CDEBUG(D_QUOTA, "slv_obj is found %p for uuid %s\n",
+                              slv_obj, uuid.uuid);
+                       qmt_obj_recalc(&env, slv_obj,
+                                      &pool->qpi_recalc_thread,
+                                      pool->qpi_site[qtype]);
+                       dt_object_put(&env, slv_obj);
+               }
+               /* Now go trough the site hash and compare lqe_granted
+                * with lqe_calc_granted. Write new value if disagree */
+
+               cfs_hash_for_each(pool->qpi_site[qtype]->lqs_hash,
+                                 qmt_site_recalc_cb, &env);
+       }
+       GOTO(out_stop, rc);
+out_stop:
+       qpi_putref(&env, glbl_pool);
+out_env:
+       lu_env_fini(&env);
+out:
+       thread_set_flags(&pool->qpi_recalc_thread, SVC_STOPPED);
+       wake_up(&pool->qpi_recalc_thread.t_ctl_waitq);
+       clear_bit(QPI_FLAG_RECALC_OFFSET, &pool->qpi_flags);
+       /* Pool can't be changed, since sem has been down.
+        * Thus until up_read, no one can restart recalc thread. */
+       if (sem) {
+               up_read(sem);
+               up_write(&pool->qpi_recalc_sem);
+       }
+       qpi_putref(&env, pool);
+
+       return rc;
+}
+
+static int qmt_start_pool_recalc(struct lu_env *env, struct qmt_pool_info *qpi)
+{
+       struct task_struct *task;
+       char *name;
+       int rc = 0;
+
+       if (!test_and_set_bit(QPI_FLAG_RECALC_OFFSET, &qpi->qpi_flags)) {
+               LASSERT(thread_is_stopped(&qpi->qpi_recalc_thread) ||
+                       thread_is_init(&qpi->qpi_recalc_thread));
+               OBD_ALLOC(name, QPI_MAXNAME + sizeof("qmt_pool_recalc_"));
+               if (name == NULL)
+                       RETURN(-ENOMEM);
+
+               snprintf(name, QPI_MAXNAME, "qsd_reint_%s",
+                        qpi->qpi_name);
+
+               qpi_getref(qpi);
+               thread_set_flags(&qpi->qpi_recalc_thread, SVC_STARTING);
+               task = kthread_run(qmt_pool_recalc, qpi, name);
+               if (IS_ERR(task)) {
+                       thread_set_flags(&qpi->qpi_recalc_thread, SVC_STOPPED);
+                       clear_bit(QPI_FLAG_RECALC_OFFSET, &qpi->qpi_flags);
+                       rc = PTR_ERR(task);
+                       qpi_putref(env, qpi);
+               }
+               OBD_FREE(name, QPI_MAXNAME + sizeof("qmt_pool_recalc_"));
+       }
+
+       RETURN(rc);
+}
+
+static inline void qmt_stop_pool_recalc(struct qmt_pool_info *qpi)
+{
+       struct ptlrpc_thread    *thread = &qpi->qpi_recalc_thread;
+
+       if (!thread_is_stopped(thread)) {
+               thread_set_flags(thread, SVC_STOPPING);
+               wake_up(&thread->t_ctl_waitq);
+
+               wait_event_idle(thread->t_ctl_waitq,
+                               thread_is_stopped(thread));
+       }
+}
+
+static int qmt_pool_slv_nr_change(const struct lu_env *env,
+                                 struct qmt_pool_info *pool,
+                                 int idx, bool add)
+{
+       struct qmt_pool_info *glbl_pool;
+       int qtype;
+
+       glbl_pool = qmt_pool_lookup_glb(env, pool->qpi_qmt, LQUOTA_RES_DT);
+       if (IS_ERR(glbl_pool))
+               RETURN(PTR_ERR(glbl_pool));
+
+       for (qtype = 0; qtype < LL_MAXQUOTAS; qtype++) {
+               struct qmt_thread_info  *qti = qmt_info(env);
+               struct dt_object *slv_obj;
+               struct obd_uuid uuid;
+
+               /* We don't need fsname here - anyway
+                * lquota_disk_slv_filename ignores it. */
+               snprintf(uuid.uuid, UUID_MAX, "-OST%04x_UUID", idx);
+               lquota_generate_fid(&qti->qti_fid, pool->qpi_rtype,
+                                   qtype);
+               /* look-up index file associated with acquiring slave */
+               slv_obj = lquota_disk_slv_find(env,
+                                       glbl_pool->qpi_qmt->qmt_child,
+                                       glbl_pool->qpi_root,
+                                       &qti->qti_fid,
+                                       &uuid);
+               if (IS_ERR(slv_obj))
+                       continue;
+
+               if (add)
+                       pool->qpi_slv_nr[QMT_STYPE_OST][qtype]++;
+               else
+                       pool->qpi_slv_nr[QMT_STYPE_OST][qtype]--;
+               dt_object_put(env, slv_obj);
+       }
+       qpi_putref(env, glbl_pool);
+
+       return 0;
+}
+
+static int qmt_pool_add_rem(struct obd_device *obd, char *poolname,
+                           char *slavename, bool add)
+{
+       struct qmt_device       *qmt = lu2qmt_dev(obd->obd_lu_dev);
+       struct qmt_pool_info    *qpi;
+       struct lu_env            env;
+       int                      rc, idx;
+       ENTRY;
+
+       if (strnlen(poolname, LOV_MAXPOOLNAME + 1) > LOV_MAXPOOLNAME)
+               RETURN(-ENAMETOOLONG);
+
+       CDEBUG(D_QUOTA, add ? "%s: pool %s, adding %s\n" :
+                             "%s: pool %s, removing %s\n",
+             obd->obd_name, poolname, slavename);
+
+       rc = server_name2index(slavename, &idx, NULL);
+       if (rc != LDD_F_SV_TYPE_OST)
+               RETURN(-EINVAL);
+
+       rc = lu_env_init(&env, LCT_MD_THREAD);
+       if (rc) {
+               CERROR("%s: cannot init env: rc = %d\n", obd->obd_name, rc);
+               RETURN(rc);
+       }
+
+       qpi = qmt_pool_lookup_name(&env, qmt, LQUOTA_RES_DT, poolname);
+       if (IS_ERR(qpi)) {
+               CWARN("%s: can't find pool %s: rc = %long\n",
+                     obd->obd_name, poolname, PTR_ERR(qpi));
+               GOTO(out, rc = PTR_ERR(qpi));
+       }
+
+       rc = add ? qmt_sarr_pool_add(qpi, idx, 32) :
+                  qmt_sarr_pool_rem(qpi, idx);
+       if (rc) {
+               CERROR("%s: can't %s %s pool %s: rc = %d\n",
+                      add ? "add to" : "remove", obd->obd_name,
+                      slavename, poolname, rc);
+               GOTO(out_putref, rc);
+       }
+       qmt_pool_slv_nr_change(&env, qpi, idx, add);
+       qmt_start_pool_recalc(&env, qpi);
+
+out_putref:
+       qpi_putref(&env, qpi);
+out:
+       lu_env_fini(&env);
+       RETURN(rc);
+}
+
+
+
+/**
+ * Add a single target device to the named pool.
+ *
+ * \param[in] obd      OBD device on which to add the pool
+ * \param[in] poolname name of the pool to which to add the target \a slavename
+ * \param[in] slavename        name of the target device to be added
+ *
+ * \retval             0 if \a slavename was (previously) added to the pool
+ * \retval             negative error number on failure
+ */
+int qmt_pool_add(struct obd_device *obd, char *poolname, char *slavename)
+{
+       return qmt_pool_add_rem(obd, poolname, slavename, true);
+}
+
+/**
+ * Remove the named target from the specified pool.
+ *
+ * \param[in] obd      OBD device from which to remove \a poolname
+ * \param[in] poolname name of the pool to be changed
+ * \param[in] slavename        name of the target to remove from \a poolname
+ *
+ * \retval             0 on successfully removing \a slavename from the pool
+ * \retval             negative number on error (e.g. \a slavename not in pool)
+ */
+int qmt_pool_rem(struct obd_device *obd, char *poolname, char *slavename)
+{
+       return qmt_pool_add_rem(obd, poolname, slavename, false);
+}
+
+/**
+ * Remove the named pool from the QMT device.
+ *
+ * \param[in] obd      OBD device on which pool was previously created
+ * \param[in] poolname name of pool to remove from \a obd
+ *
+ * \retval             0 on successfully removing the pool
+ * \retval             negative error numbers for failures
+ */
+int qmt_pool_del(struct obd_device *obd, char *poolname)
+{
+       struct qmt_device       *qmt = lu2qmt_dev(obd->obd_lu_dev);
+       struct qmt_pool_info    *qpi;
+       struct lu_fid            fid;
+       char                     buf[LQUOTA_NAME_MAX];
+       struct lu_env            env;
+       int                      rc;
+       int                      qtype;
+       ENTRY;
+
+       if (strnlen(poolname, LOV_MAXPOOLNAME + 1) > LOV_MAXPOOLNAME)
+               RETURN(-ENAMETOOLONG);
+
+       CDEBUG(D_QUOTA, "Removing quota pool "LOV_POOLNAMEF"\n",
+              poolname);
+
+       rc = lu_env_init(&env, LCT_MD_THREAD);
+       if (rc) {
+               CERROR("%s: cannot init env: rc = %d\n", obd->obd_name, rc);
+               RETURN(rc);
+       }
+
+       /* look-up pool in charge of this global index FID */
+       qpi = qmt_pool_lookup_name(&env, qmt, LQUOTA_RES_DT, poolname);
+       if (IS_ERR(qpi)) {
+               /* Valid case for several MDTs at the same node -
+                * pool removed by the 1st MDT in config */
+               CDEBUG(D_QUOTA, "Cannot find pool %s\n", poolname);
+               lu_env_fini(&env);
+               RETURN(PTR_ERR(qpi));
+       }
+
+       for (qtype = 0; qtype < LL_MAXQUOTAS; qtype++) {
+               lquota_generate_fid(&fid, LQUOTA_RES_DT, qtype);
+               snprintf(buf, LQUOTA_NAME_MAX, "0x%x", fid.f_oid);
+               rc = local_object_unlink(&env, qmt->qmt_child,
+                                        qpi->qpi_root, buf);
+               if (rc)
+                       CWARN("%s: cannot unlink %s from pool %s: rc = %d\n",
+                             obd->obd_name, buf, poolname, rc);
+       }
+
+       /* put ref from look-up */
+       qpi_putref(&env, qpi);
+       /* put last ref to free qpi */
+       qpi_putref(&env, qpi);
+
+       snprintf(buf, LQUOTA_NAME_MAX, "%s-%s",
+                RES_NAME(LQUOTA_RES_DT), poolname);
+       rc = local_object_unlink(&env, qmt->qmt_child, qmt->qmt_root, buf);
+       if (rc)
+               CWARN("%s: cannot unlink dir %s: rc = %d\n",
+                     obd->obd_name, poolname, rc);
+
+       lu_env_fini(&env);
+       RETURN(0);
+}
+
+static inline int qmt_sarr_pool_init(struct qmt_pool_info *qpi)
+{
+
+       /* No need to initialize sarray for global pool
+        * as it always includes all slaves */
+       if (qmt_pool_global(qpi))
+               return 0;
+
+       switch (qpi->qpi_rtype) {
+       case LQUOTA_RES_DT:
+               return tgt_pool_init(&qpi->qpi_sarr.osts, 0);
+       case LQUOTA_RES_MD:
+       default:
+               return 0;
+       }
+}
+
+static inline int qmt_sarr_pool_add(struct qmt_pool_info *qpi, int idx, int min)
+{
+       switch (qpi->qpi_rtype) {
+       case LQUOTA_RES_DT:
+               return tgt_pool_add(&qpi->qpi_sarr.osts, idx, min);
+       case LQUOTA_RES_MD:
+       default:
+               return 0;
+       }
+}
+
+static inline int qmt_sarr_pool_rem(struct qmt_pool_info *qpi, int idx)
+{
+       switch (qpi->qpi_rtype) {
+       case LQUOTA_RES_DT:
+               return tgt_pool_remove(&qpi->qpi_sarr.osts, idx);
+       case LQUOTA_RES_MD:
+       default:
+               return 0;
+       }
+}
+
+static inline int qmt_sarr_pool_free(struct qmt_pool_info *qpi)
+{
+       if (qmt_pool_global(qpi))
+               return 0;
+
+       switch (qpi->qpi_rtype) {
+       case LQUOTA_RES_DT:
+               if (!qpi->qpi_sarr.osts.op_array)
+                       return 0;
+               return tgt_pool_free(&qpi->qpi_sarr.osts);
+       case LQUOTA_RES_MD:
+       default:
+               return 0;
+       }
+}
+
+static inline int qmt_sarr_check_idx(struct qmt_pool_info *qpi, int idx)
+{
+       if (qmt_pool_global(qpi))
+               return 0;
+
+       switch (qpi->qpi_rtype) {
+       case LQUOTA_RES_DT:
+               return tgt_check_index(idx, &qpi->qpi_sarr.osts);
+       case LQUOTA_RES_MD:
+       default:
+               return 0;
+       }
+}
+
+inline struct rw_semaphore *qmt_sarr_rwsem(struct qmt_pool_info *qpi)
+{
+       switch (qpi->qpi_rtype) {
+       case LQUOTA_RES_DT:
+               /* to protect ost_pool use */
+               return &qpi->qpi_sarr.osts.op_rw_sem;
+       case LQUOTA_RES_MD:
+       default:
+               return NULL;
+       }
+}
+
+inline int qmt_sarr_get_idx(struct qmt_pool_info *qpi, int arr_idx)
+{
+
+       if (qmt_pool_global(qpi))
+               return arr_idx;
+
+       switch (qpi->qpi_rtype) {
+       case LQUOTA_RES_DT:
+               LASSERTF(arr_idx < qpi->qpi_sarr.osts.op_count && arr_idx >= 0,
+                        "idx invalid %d op_count %d\n", arr_idx,
+                        qpi->qpi_sarr.osts.op_count);
+               return qpi->qpi_sarr.osts.op_array[arr_idx];
+       case LQUOTA_RES_MD:
+       default:
+               return -EINVAL;
+       }
+}
+
+/* Number of slaves in a pool */
+inline unsigned int qmt_sarr_count(struct qmt_pool_info *qpi)
+{
+       switch (qpi->qpi_rtype) {
+       case LQUOTA_RES_DT:
+               return qpi->qpi_sarr.osts.op_count;
+       case LQUOTA_RES_MD:
+       default:
+               return -EINVAL;
+       }
+}
index 3f622b9..5ecf8fe 100644 (file)
@@ -60,9 +60,10 @@ static void qsd_lqe_init(struct lquota_entry *lqe, void *arg)
  * \param env - the environment passed by the caller
  * \param lqe - is the quota entry to refresh
  * \param arg - is the pointer to the qsd_qtype_info structure
+ * \param need_crt - needed to be compat with qmt_lqe_read
  */
 static int qsd_lqe_read(const struct lu_env *env, struct lquota_entry *lqe,
-                       void *arg)
+                       void *arg, bool need_crt)
 {
        struct qsd_thread_info *qti = qsd_info(env);
        struct qsd_qtype_info  *qqi = (struct qsd_qtype_info *)arg;
index f8011eb..ed67286 100644 (file)
@@ -786,8 +786,11 @@ out_flags:
                        lqe_read_lock(lqe);
                        usage = lqe->lqe_pending_write;
                        usage += lqe->lqe_waiting_write;
-                       if (lqe->lqe_qunit != 0 && (usage % lqe->lqe_qunit >
-                           qqi->qqi_qsd->qsd_sync_threshold))
+                       /* There is a chance to successfully grant more quota
+                        * but get edquot flag through glimpse. */
+                       if (lqe->lqe_edquot || (lqe->lqe_qunit != 0 &&
+                          (usage % lqe->lqe_qunit >
+                           qqi->qqi_qsd->qsd_sync_threshold)))
                                usage += qqi->qqi_qsd->qsd_sync_threshold;
 
                        usage += lqe->lqe_usage;
index f5e24a3..e058b38 100644 (file)
@@ -466,8 +466,8 @@ static int qsd_id_glimpse_ast(struct ldlm_lock *lock, void *data)
                /* valid race */
                GOTO(out, rc = -ELDLM_NO_LOCK_DATA);
 
-       LQUOTA_DEBUG(lqe, "glimpse on quota locks, new qunit:%llu",
-                    desc->gl_qunit);
+       LQUOTA_DEBUG(lqe, "glimpse on quota locks, new qunit:%llu, edquot:%d",
+                    desc->gl_qunit, !!(desc->gl_flags & LQUOTA_FL_EDQUOT));
 
        lqe_write_lock(lqe);
        lvb->lvb_id_rel = 0;
index a8165a9..c77103d 100644 (file)
@@ -36,3 +36,4 @@ EXTRA_DIST = tgt_main.c tgt_lastrcvd.c tgt_handler.c tgt_internal.h \
 EXTRA_DIST += update_trans.c
 EXTRA_DIST += update_records.c
 EXTRA_DIST += update_recovery.c
+EXTRA_DIST += tgt_pool.c
diff --git a/lustre/target/tgt_pool.c b/lustre/target/tgt_pool.c
new file mode 100644 (file)
index 0000000..4beef3a
--- /dev/null
@@ -0,0 +1,246 @@
+/*
+ * GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; If not, see
+ * http://www.gnu.org/licenses/gpl-2.0.html
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright  2008 Sun Microsystems, Inc. All rights reserved
+ * Use is subject to license terms.
+ *
+ * Copyright (c) 2012, 2017, Intel Corporation.
+ */
+/*
+ * This file is part of Lustre, http://www.lustre.org/
+ * Lustre is a trademark of Sun Microsystems, Inc.
+ */
+/*
+ * lustre/target/tgt_pool.c
+ *
+ * This file handles creation, lookup, and removal of pools themselves, as
+ * well as adding and removing targets to pools.
+ *
+ * Author: Jacques-Charles LAFOUCRIERE <jc.lafoucriere@cea.fr>
+ * Author: Alex Lyashkov <Alexey.Lyashkov@Sun.COM>
+ * Author: Nathaniel Rutman <Nathan.Rutman@Sun.COM>
+ */
+
+#define DEBUG_SUBSYSTEM S_CLASS
+
+#include <obd_target.h>
+#include <obd_support.h>
+
+/**
+ * Initialize the pool data structures at startup.
+ *
+ * Allocate and initialize the pool data structures with the specified
+ * array size.  If pool count is not specified (\a count == 0), then
+ * POOL_INIT_COUNT will be used.  Allocating a non-zero initial array
+ * size avoids the need to reallocate as new pools are added.
+ *
+ * \param[in] op       pool structure
+ * \param[in] count    initial size of the target op_array[] array
+ *
+ * \retval             0 indicates successful pool initialization
+ * \retval             negative error number on failure
+ */
+#define POOL_INIT_COUNT 2
+int tgt_pool_init(struct lu_tgt_pool *op, unsigned int count)
+{
+       ENTRY;
+
+       if (count == 0)
+               count = POOL_INIT_COUNT;
+       op->op_array = NULL;
+       op->op_count = 0;
+       init_rwsem(&op->op_rw_sem);
+       op->op_size = count * sizeof(op->op_array[0]);
+       OBD_ALLOC(op->op_array, op->op_size);
+       if (op->op_array == NULL) {
+               op->op_size = 0;
+               RETURN(-ENOMEM);
+       }
+       EXIT;
+       return 0;
+}
+EXPORT_SYMBOL(tgt_pool_init);
+
+/**
+ * Increase the op_array size to hold more targets in this pool.
+ *
+ * The size is increased to at least \a min_count, but may be larger
+ * for an existing pool since ->op_array[] is growing exponentially.
+ * Caller must hold write op_rwlock.
+ *
+ * \param[in] op       pool structure
+ * \param[in] min_count        minimum number of entries to handle
+ *
+ * \retval             0 on success
+ * \retval             negative error number on failure.
+ */
+int tgt_pool_extend(struct lu_tgt_pool *op, unsigned int min_count)
+{
+       __u32 *new;
+       __u32 new_size;
+
+       LASSERT(min_count != 0);
+
+       if (op->op_count * sizeof(op->op_array[0]) < op->op_size)
+               return 0;
+
+       new_size = max_t(__u32, min_count * sizeof(op->op_array[0]),
+                        2 * op->op_size);
+       OBD_ALLOC(new, new_size);
+       if (new == NULL)
+               return -ENOMEM;
+
+       /* copy old array to new one */
+       memcpy(new, op->op_array, op->op_size);
+       OBD_FREE(op->op_array, op->op_size);
+       op->op_array = new;
+       op->op_size = new_size;
+
+       return 0;
+}
+EXPORT_SYMBOL(tgt_pool_extend);
+
+/**
+ * Add a new target to an existing pool.
+ *
+ * Add a new target device to the pool previously created and returned by
+ * lod_pool_new().  Each target can only be in each pool at most one time.
+ *
+ * \param[in] op       target pool to add new entry
+ * \param[in] idx      pool index number to add to the \a op array
+ * \param[in] min_count        minimum number of entries to expect in the pool
+ *
+ * \retval             0 if target could be added to the pool
+ * \retval             negative error if target \a idx was not added
+ */
+int tgt_pool_add(struct lu_tgt_pool *op, __u32 idx, unsigned int min_count)
+{
+       unsigned int i;
+       int rc = 0;
+       ENTRY;
+
+       down_write(&op->op_rw_sem);
+
+       rc = tgt_pool_extend(op, min_count);
+       if (rc)
+               GOTO(out, rc);
+
+       /* search ost in pool array */
+       for (i = 0; i < op->op_count; i++) {
+               if (op->op_array[i] == idx)
+                       GOTO(out, rc = -EEXIST);
+       }
+       /* ost not found we add it */
+       op->op_array[op->op_count] = idx;
+       op->op_count++;
+       EXIT;
+out:
+       up_write(&op->op_rw_sem);
+       return rc;
+}
+EXPORT_SYMBOL(tgt_pool_add);
+
+/**
+ * Remove an existing pool from the system.
+ *
+ * The specified pool must have previously been allocated by
+ * lod_pool_new() and not have any target members in the pool.
+ * If the removed target is not the last, compact the array
+ * to remove empty spaces.
+ *
+ * \param[in] op       pointer to the original data structure
+ * \param[in] idx      target index to be removed
+ *
+ * \retval             0 on success
+ * \retval             negative error number on failure
+ */
+int tgt_pool_remove(struct lu_tgt_pool *op, __u32 idx)
+{
+       unsigned int i;
+       ENTRY;
+
+       down_write(&op->op_rw_sem);
+
+       for (i = 0; i < op->op_count; i++) {
+               if (op->op_array[i] == idx) {
+                       memmove(&op->op_array[i], &op->op_array[i + 1],
+                               (op->op_count - i - 1) *
+                               sizeof(op->op_array[0]));
+                       op->op_count--;
+                       up_write(&op->op_rw_sem);
+                       EXIT;
+                       return 0;
+               }
+       }
+
+       up_write(&op->op_rw_sem);
+       RETURN(-EINVAL);
+}
+EXPORT_SYMBOL(tgt_pool_remove);
+
+int tgt_check_index(int idx, struct lu_tgt_pool *osts)
+{
+       int rc, i;
+       ENTRY;
+
+       down_read(&osts->op_rw_sem);
+       for (i = 0; i < osts->op_count; i++) {
+               if (osts->op_array[i] == idx)
+                       GOTO(out, rc = 0);
+       }
+       rc = -ENOENT;
+       EXIT;
+out:
+       up_read(&osts->op_rw_sem);
+       return rc;
+}
+EXPORT_SYMBOL(tgt_check_index);
+
+/**
+ * Free the pool after it was emptied and removed from /proc.
+ *
+ * Note that all of the child/target entries referenced by this pool
+ * must have been removed by lod_ost_pool_remove() before it can be
+ * deleted from memory.
+ *
+ * \param[in] op       pool to be freed.
+ *
+ * \retval             0 on success or if pool was already freed
+ */
+int tgt_pool_free(struct lu_tgt_pool *op)
+{
+       ENTRY;
+
+       if (op->op_size == 0)
+               RETURN(0);
+
+       down_write(&op->op_rw_sem);
+
+       OBD_FREE(op->op_array, op->op_size);
+       op->op_array = NULL;
+       op->op_count = 0;
+       op->op_size = 0;
+
+       up_write(&op->op_rw_sem);
+       RETURN(0);
+}
+EXPORT_SYMBOL(tgt_pool_free);
index 5b08e95..d2c07d8 100755 (executable)
@@ -39,6 +39,12 @@ BLK_SZ=1024
 MAX_DQ_TIME=604800
 MAX_IQ_TIME=604800
 QTYPE="ugp"
+# QP exists since this version. Should be finally set before landing.
+VERSION_WITH_QP="2.13.53"
+mds_supports_qp() {
+       [ $MDS1_VERSION -lt $(version_code $VERSION_WITH_QP) ] &&
+               skip "Needs MDS version $VERSION_WITH_QP or later."
+}
 
 require_dsh_mds || exit 0
 require_dsh_ost || exit 0
@@ -161,14 +167,17 @@ quota_log() {
 
 # get quota for a user or a group
 # usage: getquota -u|-g|-p <username>|<groupname>|<projid> global|<obd_uuid> \
-#                bhardlimit|bsoftlimit|bgrace|ihardlimit|isoftlimit|igrace
+#                bhardlimit|bsoftlimit|bgrace|ihardlimit|isoftlimit|igrace \
+#                <pool_name>
 getquota() {
        local spec
        local uuid
+       local pool_arg
 
        sync_all_data > /dev/null 2>&1 || true
 
-       [ "$#" != 4 ] && error "getquota: wrong number of arguments: $#"
+       [ "$#" != 4 -a "$#" != 5 ] &&
+               error "getquota: wrong number of arguments: $#"
        [ "$1" != "-u" -a "$1" != "-g" -a "$1" != "-p" ] &&
                error "getquota: wrong u/g/p specifier $1 passed"
 
@@ -186,9 +195,10 @@ getquota() {
                *)          error "unknown quota parameter $4";;
        esac
 
+       [ ! -z "$5" ] && pool_arg="--pool $5 "
        [ "$uuid" = "global" ] && uuid=$DIR
 
-       $LFS quota -v "$1" "$2" $DIR |
+       $LFS quota -v "$1" "$2" $pool_arg $DIR |
                awk 'BEGIN { num='$spec' } { if ($1 == "'$uuid'") \
                { if (NF == 1) { getline } else { num++ } ; print $num;} }' \
                | tr -d "*"
@@ -325,8 +335,10 @@ wait_ost_reint() {
 wait_grace_time() {
        local qtype=$1
        local flavour=$2
-       local extrasleep=${3:-5}
+       local pool=${3:-}
+       local extrasleep=${4:-5}
        local qarg
+       local parg
 
        case $qtype in
                u|g) qarg=$TSTUSR ;;
@@ -334,9 +346,15 @@ wait_grace_time() {
                *) error "get_grace_time: Invalid quota type: $qtype"
        esac
 
+       if [ $pool ]; then
+               parg="--pool "$pool
+               echo "Quota info for $pool:"
+               $LFS quota -$qtype $qarg $parg $DIR
+       fi
+
        case $flavour in
                block)
-                       time=$(lfs quota -$qtype $qarg $DIR|
+                       time=$(lfs quota -$qtype $qarg $parg $DIR|
                                   awk 'NR == 3{ print $5 }'| sed 's/s$//')
                        ;;
                file)
@@ -371,7 +389,6 @@ setup_quota_test() {
 }
 
 cleanup_quota_test() {
-       trap 0
        echo "Delete files..."
        rm -rf $DIR/$tdir
        echo "Wait for unlink objects finished..."
@@ -451,7 +468,7 @@ reset_quota_settings() {
 
 # enable quota debug
 quota_init() {
-       do_nodes $(comma_list $(nodes_list)) "lctl set_param debug=+quota"
+       do_nodes $(comma_list $(nodes_list)) "lctl set_param debug=+quota+trace"
 }
 quota_init
 reset_quota_settings
@@ -507,10 +524,36 @@ test_0() {
 }
 run_test 0 "Test basic quota performance"
 
+# usage: test_1_check_write tfile user|group|project
+test_1_check_write() {
+       local testfile="$1"
+       local qtype="$2"
+       local limit=$3
+       local short_qtype=${qtype:0:1}
+
+       log "Write..."
+       $RUNAS $DD of=$testfile count=$((limit/2)) ||
+               quota_error $short_qtype $TSTUSR \
+                       "$qtype write failure, but expect success"
+       log "Write out of block quota ..."
+       # this time maybe cache write,  ignore it's failure
+       $RUNAS $DD of=$testfile count=$((limit/2)) seek=$((limit/2)) || true
+       # flush cache, ensure noquota flag is set on client
+       cancel_lru_locks osc
+       sync; sync_all_data || true
+       # sync means client wrote all it's cache, but id doesn't
+       # garantee that slave got new edquot trough glimpse.
+       # so wait a little to be sure slave got it.
+       sleep 5
+       $RUNAS $DD of=$testfile count=1 seek=$limit &&
+               quota_error $short_qtype $TSTUSR \
+                       "user write success, but expect EDQUOT"
+}
+
 # test block hardlimit
-test_1() {
-       local LIMIT=10  # 10M
-       local TESTFILE="$DIR/$tdir/$tfile-0"
+test_1a() {
+       local limit=10  # 10M
+       local testfile="$DIR/$tdir/$tfile-0"
 
        setup_quota_test || error "setup quota failed with $?"
        trap cleanup_quota_test EXIT
@@ -519,66 +562,47 @@ test_1() {
        set_ost_qtype $QTYPE || error "enable ost quota failed"
 
        # test for user
-       log "User quota (block hardlimit:$LIMIT MB)"
-       $LFS setquota -u $TSTUSR -b 0 -B ${LIMIT}M -i 0 -I 0 $DIR ||
+       log "User quota (block hardlimit:$limit MB)"
+       $LFS setquota -u $TSTUSR -b 0 -B ${limit}M -i 0 -I 0 $DIR ||
                error "set user quota failed"
 
        # make sure the system is clean
-       local USED=$(getquota -u $TSTUSR global curspace)
-       [ $USED -ne 0 ] && error "Used space($USED) for user $TSTUSR isn't 0."
+       local used=$(getquota -u $TSTUSR global curspace)
+       [ $used -ne 0 ] && error "Used space($used) for user $TSTUSR isn't 0."
 
-       $LFS setstripe $TESTFILE -c 1 || error "setstripe $TESTFILE failed"
-       chown $TSTUSR.$TSTUSR $TESTFILE || error "chown $TESTFILE failed"
+       $LFS setstripe $testfile -c 1 || error "setstripe $testfile failed"
+       chown $TSTUSR.$TSTUSR $testfile || error "chown $testfile failed"
 
-       log "Write..."
-       $RUNAS $DD of=$TESTFILE count=$((LIMIT/2)) ||
-               quota_error u $TSTUSR "user write failure, but expect success"
-       log "Write out of block quota ..."
-       # this time maybe cache write,  ignore it's failure
-       $RUNAS $DD of=$TESTFILE count=$((LIMIT/2)) seek=$((LIMIT/2)) || true
-       # flush cache, ensure noquota flag is set on client
-       cancel_lru_locks osc
-       sync; sync_all_data || true
-       $RUNAS $DD of=$TESTFILE count=1 seek=$LIMIT &&
-               quota_error u $TSTUSR "user write success, but expect EDQUOT"
+       test_1_check_write $testfile "user" $limit
 
-       rm -f $TESTFILE
+       rm -f $testfile
        wait_delete_completed || error "wait_delete_completed failed"
        sync_all_data || true
-       USED=$(getquota -u $TSTUSR global curspace)
-       [ $USED -ne 0 ] && quota_error u $TSTUSR \
+       used=$(getquota -u $TSTUSR global curspace)
+       [ $used -ne 0 ] && quota_error u $TSTUSR \
                "user quota isn't released after deletion"
        resetquota -u $TSTUSR
 
        # test for group
        log "--------------------------------------"
-       log "Group quota (block hardlimit:$LIMIT MB)"
-       $LFS setquota -g $TSTUSR -b 0 -B ${LIMIT}M -i 0 -I 0 $DIR ||
+       log "Group quota (block hardlimit:$limit MB)"
+       $LFS setquota -g $TSTUSR -b 0 -B ${limit}M -i 0 -I 0 $DIR ||
                error "set group quota failed"
 
-       TESTFILE="$DIR/$tdir/$tfile-1"
+       testfile="$DIR/$tdir/$tfile-1"
        # make sure the system is clean
-       USED=$(getquota -g $TSTUSR global curspace)
-       [ $USED -ne 0 ] && error "Used space ($USED) for group $TSTUSR isn't 0"
+       used=$(getquota -g $TSTUSR global curspace)
+       [ $used -ne 0 ] && error "Used space ($used) for group $TSTUSR isn't 0"
 
-       $LFS setstripe $TESTFILE -c 1 || error "setstripe $TESTFILE failed"
-       chown $TSTUSR.$TSTUSR $TESTFILE || error "chown $TESTFILE failed"
+       $LFS setstripe $testfile -c 1 || error "setstripe $testfile failed"
+       chown $TSTUSR.$TSTUSR $testfile || error "chown $testfile failed"
 
-       log "Write ..."
-       $RUNAS $DD of=$TESTFILE count=$((LIMIT/2)) ||
-               quota_error g $TSTUSR "Group write failure, but expect success"
-       log "Write out of block quota ..."
-       # this time maybe cache write, ignore it's failure
-       $RUNAS $DD of=$TESTFILE count=$((LIMIT/2)) seek=$((LIMIT/2)) || true
-       cancel_lru_locks osc
-       sync; sync_all_data || true
-       $RUNAS $DD of=$TESTFILE count=10 seek=$LIMIT &&
-               quota_error g $TSTUSR "Group write success, but expect EDQUOT"
-       rm -f $TESTFILE
+       test_1_check_write $testfile "group" $limit
+       rm -f $testfile
        wait_delete_completed || error "wait_delete_completed failed"
        sync_all_data || true
-       USED=$(getquota -g $TSTUSR global curspace)
-       [ $USED -ne 0 ] && quota_error g $TSTUSR \
+       used=$(getquota -g $TSTUSR global curspace)
+       [ $used -ne 0 ] && quota_error g $TSTUSR \
                                "Group quota isn't released after deletion"
        resetquota -g $TSTUSR
 
@@ -588,41 +612,384 @@ test_1() {
                return 0
        fi
 
-       TESTFILE="$DIR/$tdir/$tfile-2"
+       testfile="$DIR/$tdir/$tfile-2"
        # make sure the system is clean
-       USED=$(getquota -p $TSTPRJID global curspace)
-       [ $USED -ne 0 ] &&
-               error "used space($USED) for project $TSTPRJID isn't 0"
+       used=$(getquota -p $TSTPRJID global curspace)
+       [ $used -ne 0 ] &&
+               error "used space($used) for project $TSTPRJID isn't 0"
 
        # test for Project
        log "--------------------------------------"
-       log "Project quota (block hardlimit:$LIMIT mb)"
-       $LFS setquota -p $TSTPRJID -b 0 -B ${LIMIT}M -i 0 -I 0 $DIR ||
+       log "Project quota (block hardlimit:$limit mb)"
+       $LFS setquota -p $TSTPRJID -b 0 -B ${limit}M -i 0 -I 0 $DIR ||
                error "set project quota failed"
 
-       $LFS setstripe $TESTFILE -c 1 || error "setstripe $TESTFILE failed"
-       chown $TSTUSR:$TSTUSR $TESTFILE || error "chown $TESTFILE failed"
-       change_project -p $TSTPRJID $TESTFILE
-
-       log "write ..."
-       $RUNAS $DD of=$TESTFILE count=$((LIMIT/2)) || quota_error p $TSTPRJID \
-               "project write failure, but expect success"
-       log "write out of block quota ..."
-       # this time maybe cache write, ignore it's failure
-       $RUNAS $DD of=$TESTFILE count=$((LIMIT/2)) seek=$((LIMIT/2)) || true
-       cancel_lru_locks osc
-       sync; sync_all_data || true
-       $RUNAS $DD of=$TESTFILE count=10 seek=$LIMIT && quota_error p \
-               $TSTPRJID "project write success, but expect EDQUOT"
+       $LFS setstripe $testfile -c 1 || error "setstripe $testfile failed"
+       chown $TSTUSR:$TSTUSR $testfile || error "chown $testfile failed"
+       change_project -p $TSTPRJID $testfile
+
+       test_1_check_write $testfile "project" $limit
 
        # cleanup
        cleanup_quota_test
 
-       USED=$(getquota -p $TSTPRJID global curspace)
-       [ $USED -eq 0 ] || quota_error p $TSTPRJID \
+       used=$(getquota -p $TSTPRJID global curspace)
+       [ $used -ne 0 ] && quota_error p $TSTPRJID \
+               "project quota isn't released after deletion"
+
+       resetquota -p $TSTPRJID
+}
+run_test 1a "Block hard limit (normal use and out of quota)"
+
+test_1b() {
+       local limit=10  # 10M
+       local global_limit=20  # 100M
+       local testfile="$DIR/$tdir/$tfile-0"
+       local qpool="qpool1"
+
+       mds_supports_qp
+       setup_quota_test || error "setup quota failed with $?"
+       stack_trap cleanup_quota_test EXIT
+
+       # enable ost quota
+       set_ost_qtype $QTYPE || error "enable ost quota failed"
+
+       # test for user
+       log "User quota (block hardlimit:$global_limit MB)"
+       $LFS setquota -u $TSTUSR -b 0 -B ${global_limit}M -i 0 -I 0 $DIR ||
+               error "set user quota failed"
+
+       pool_add $qpool || error "pool_add failed"
+       pool_add_targets $qpool 0 $(($OSTCOUNT - 1)) ||
+               error "pool_add_targets failed"
+
+       $LFS setquota -u $TSTUSR -B ${limit}M -o $qpool $DIR ||
+               error "set user quota failed"
+
+       # make sure the system is clean
+       local used=$(getquota -u $TSTUSR global curspace)
+       echo "used $used"
+       [ $used -ne 0 ] && error "Used space($used) for user $TSTUSR isn't 0."
+
+       used=$(getquota -u $TSTUSR global bhardlimit $qpool)
+
+       $LFS setstripe $testfile -c 1 || error "setstripe $testfile failed"
+       chown $TSTUSR.$TSTUSR $testfile || error "chown $testfile failed"
+
+       test_1_check_write $testfile "user" $limit
+
+       rm -f $testfile
+       wait_delete_completed || error "wait_delete_completed failed"
+       sync_all_data || true
+       used=$(getquota -u $TSTUSR global curspace $qpool)
+       [ $used -ne 0 ] && quota_error u $TSTUSR \
+               "user quota isn't released after deletion"
+       resetquota -u $TSTUSR
+
+       # test for group
+       log "--------------------------------------"
+       log "Group quota (block hardlimit:$global_limit MB)"
+       $LFS setquota -g $TSTUSR -b 0 -B ${global_limit}M -i 0 -I 0 $DIR ||
+               error "set group quota failed"
+
+       $LFS setquota -g $TSTUSR -b 0 -B ${limit}M -o $qpool $DIR ||
+               error "set group quota failed"
+
+       testfile="$DIR/$tdir/$tfile-1"
+       # make sure the system is clean
+       used=$(getquota -g $TSTUSR global curspace $qpool)
+       [ $used -ne 0 ] && error "Used space ($used) for group $TSTUSR isn't 0"
+
+       $LFS setstripe $testfile -c 1 || error "setstripe $testfile failed"
+       chown $TSTUSR.$TSTUSR $testfile || error "chown $testfile failed"
+
+       test_1_check_write $testfile "group" $limit
+
+       rm -f $testfile
+       wait_delete_completed || error "wait_delete_completed failed"
+       sync_all_data || true
+       used=$(getquota -g $TSTUSR global curspace $qpool)
+       [ $used -ne 0 ] && quota_error g $TSTUSR \
+                               "Group quota isn't released after deletion"
+       resetquota -g $TSTUSR
+
+       if ! is_project_quota_supported; then
+               echo "Project quota is not supported"
+               cleanup_quota_test
+               return 0
+       fi
+
+       testfile="$DIR/$tdir/$tfile-2"
+       # make sure the system is clean
+       used=$(getquota -p $TSTPRJID global curspace $qpool)
+       [ $used -ne 0 ] &&
+               error "used space($used) for project $TSTPRJID isn't 0"
+
+       # test for Project
+       log "--------------------------------------"
+       log "Project quota (block hardlimit:$global_limit mb)"
+       $LFS setquota -p $TSTPRJID -b 0 -B ${global_limit}M -i 0 -I 0 $DIR ||
+               error "set project quota failed"
+
+       $LFS setquota -p $TSTPRJID -b 0 -B ${limit}M -o $qpool $DIR ||
+               error "set project quota failed"
+
+
+       $LFS setstripe $testfile -c 1 || error "setstripe $testfile failed"
+       chown $TSTUSR:$TSTUSR $testfile || error "chown $testfile failed"
+       change_project -p $TSTPRJID $testfile
+
+       test_1_check_write $testfile "project" $limit
+
+       # cleanup
+       cleanup_quota_test
+
+       used=$(getquota -p $TSTPRJID global curspace)
+       [ $used -eq 0 ] || quota_error p $TSTPRJID \
                "project quota isn't released after deletion"
 }
-run_test 1 "Block hard limit (normal use and out of quota)"
+run_test 1b "Quota pools: Block hard limit (normal use and out of quota)"
+
+test_1c() {
+       local global_limit=20  # 100M
+       local testfile="$DIR/$tdir/$tfile-0"
+       local qpool1="qpool1"
+       local qpool2="qpool2"
+
+       mds_supports_qp
+       setup_quota_test || error "setup quota failed with $?"
+       stack_trap cleanup_quota_test EXIT
+
+       # enable ost quota
+       set_ost_qtype $QTYPE || error "enable ost quota failed"
+
+       # test for user
+       log "User quota (block hardlimit:$global_limit MB)"
+       $LFS setquota -u $TSTUSR -b 0 -B ${global_limit}M -i 0 -I 0 $DIR ||
+               error "set user quota failed"
+
+       pool_add $qpool1 || error "pool_add failed"
+       pool_add_targets $qpool1 0 $(($OSTCOUNT - 1)) ||
+               error "pool_add_targets failed"
+
+       pool_add $qpool2 || error "pool_add failed"
+       pool_add_targets $qpool2 0 $(($OSTCOUNT - 1)) ||
+               error "pool_add_targets failed"
+
+       # create pools without hard limit
+       # initially such case raised several bugs
+       $LFS setquota -u $TSTUSR -B 0M -o $qpool1 $DIR ||
+               error "set user quota failed"
+
+       $LFS setquota -u $TSTUSR -B 0M -o $qpool2 $DIR ||
+               error "set user quota failed"
+
+       # make sure the system is clean
+       local used=$(getquota -u $TSTUSR global curspace)
+       echo "used $used"
+       [ $used -ne 0 ] && error "Used space($used) for user $TSTUSR isn't 0."
+
+       used=$(getquota -u $TSTUSR global bhardlimit $qpool)
+
+       test_1_check_write $testfile "user" $global_limit
+
+       used=$(getquota -u $TSTUSR global curspace $qpool1)
+       echo "qpool1 used $used"
+       used=$(getquota -u $TSTUSR global curspace $qpool2)
+       echo "qpool2 used $used"
+
+       rm -f $testfile
+       wait_delete_completed || error "wait_delete_completed failed"
+       sync_all_data || true
+
+       used=$(getquota -u $TSTUSR global curspace $qpool1)
+       [ $used -ne 0 ] && quota_error u $TSTUSR \
+               "user quota isn't released after deletion"
+       resetquota -u $TSTUSR
+
+       # cleanup
+       cleanup_quota_test
+}
+run_test 1c "Quota pools: check 3 pools with hardlimit only for global"
+
+test_1d() {
+       local limit1=10  # 10M
+       local limit2=12  # 12M
+       local global_limit=20  # 100M
+       local testfile="$DIR/$tdir/$tfile-0"
+       local qpool1="qpool1"
+       local qpool2="qpool2"
+
+       mds_supports_qp
+       setup_quota_test || error "setup quota failed with $?"
+       stack_trap cleanup_quota_test EXIT
+
+       # enable ost quota
+       set_ost_qtype $QTYPE || error "enable ost quota failed"
+
+       # test for user
+       log "User quota (block hardlimit:$global_limit MB)"
+       $LFS setquota -u $TSTUSR -b 0 -B ${global_limit}M -i 0 -I 0 $DIR ||
+               error "set user quota failed"
+
+       pool_add $qpool1 || error "pool_add failed"
+       pool_add_targets $qpool1 0 $(($OSTCOUNT - 1)) ||
+               error "pool_add_targets failed"
+
+       pool_add $qpool2 || error "pool_add failed"
+       pool_add_targets $qpool2 0 $(($OSTCOUNT - 1)) ||
+               error "pool_add_targets failed"
+
+       $LFS setquota -u $TSTUSR -B ${limit1}M -o $qpool1 $DIR ||
+               error "set user quota failed"
+
+       $LFS setquota -u $TSTUSR -B ${limit2}M -o $qpool2 $DIR ||
+       error "set user quota failed"
+
+       # make sure the system is clean
+       local used=$(getquota -u $TSTUSR global curspace)
+       echo "used $used"
+       [ $used -ne 0 ] && error "used space($used) for user $TSTUSR isn't 0."
+
+       used=$(getquota -u $TSTUSR global bhardlimit $qpool)
+
+       test_1_check_write $testfile "user" $limit1
+
+       used=$(getquota -u $TSTUSR global curspace $qpool1)
+       echo "qpool1 used $used"
+       used=$(getquota -u $TSTUSR global curspace $qpool2)
+       echo "qpool2 used $used"
+
+       rm -f $testfile
+       wait_delete_completed || error "wait_delete_completed failed"
+       sync_all_data || true
+
+       used=$(getquota -u $TSTUSR global curspace $qpool1)
+       [ $used -ne 0 ] && quota_error u $TSTUSR \
+               "user quota isn't released after deletion"
+       resetquota -u $TSTUSR
+
+       # cleanup
+       cleanup_quota_test
+}
+run_test 1d "Quota pools: check block hardlimit on different pools"
+
+test_1e() {
+       local limit1=10  # 10M
+       local global_limit=200  # 200M
+       local testfile="$DIR/$tdir/$tfile-0"
+       local testfile2="$DIR/$tdir/$tfile-1"
+       local qpool1="qpool1"
+
+       mds_supports_qp
+       setup_quota_test || error "setup quota failed with $?"
+       stack_trap cleanup_quota_test EXIT
+
+       # enable ost quota
+       set_ost_qtype $QTYPE || error "enable ost quota failed"
+
+       # global_limit is much greater than limit1 to get
+       # different qunit's on osts. Since 1st qunit shrinking
+       # on OST1(that belongs to qpool1), this qunit should
+       # be sent to OST1.
+       log "User quota (block hardlimit:$global_limit MB)"
+       $LFS setquota -u $TSTUSR -b 0 -B ${global_limit}M -i 0 -I 0 $DIR ||
+               error "set user quota failed"
+
+       pool_add $qpool1 || error "pool_add failed"
+       pool_add_targets $qpool1 1 1 ||
+               error "pool_add_targets failed"
+
+       $LFS setquota -u $TSTUSR -B ${limit1}M -o $qpool1 $DIR ||
+               error "set user quota failed"
+
+       # make sure the system is clean
+       local used=$(getquota -u $TSTUSR global curspace)
+       [ $used -ne 0 ] && error "Used space($used) for user $TSTUSR isn't 0."
+
+       $LFS setstripe $testfile -c 1 -i 1 || error "setstripe $testfile failed"
+       chown $TSTUSR.$TSTUSR $testfile || error "chown $testfile failed"
+
+       test_1_check_write $testfile "user" $limit1
+
+       $LFS setstripe $testfile2 -c 1 -i 0 ||
+               error "setstripe $testfile2 failed"
+       chown $TSTUSR.$TSTUSR $testfile2 || error "chown $testfile2 failed"
+       # Now write to file with a stripe on OST0, that doesn't belong to qpool1
+       log "Write..."
+       $RUNAS $DD of=$testfile2 count=20 ||
+               quota_error $short_qtype $TSTUSR \
+                       "$qtype write failure, but expect success"
+
+       rm -f $testfile
+       rm -f $testfile2
+       wait_delete_completed || error "wait_delete_completed failed"
+       sync_all_data || true
+
+       used=$(getquota -u $TSTUSR global curspace $qpool1)
+       [ $used -ne 0 ] && quota_error u $TSTUSR \
+               "user quota isn't released after deletion"
+       resetquota -u $TSTUSR
+
+       # cleanup
+       cleanup_quota_test
+}
+run_test 1e "Quota pools: global pool high block limit vs quota pool with small"
+
+test_1f() {
+       local global_limit=200  # 200M
+       local limit1=10  # 10M
+       local TESTDIR="$DIR/$tdir/"
+       local testfile="$TESTDIR/$tfile-0"
+       local qpool1="qpool1"
+
+       mds_supports_qp
+       setup_quota_test || error "setup quota failed with $?"
+       stack_trap cleanup_quota_test EXIT
+
+       # enable ost quota
+       set_ost_qtype $QTYPE || error "enable ost quota failed"
+
+       log "User quota (block hardlimit:$global_limit MB)"
+       $LFS setquota -u $TSTUSR -b 0 -B ${global_limit}M -i 0 -I 0 $DIR ||
+               error "set user quota failed"
+
+       pool_add $qpool1 || error "pool_add failed"
+       pool_add_targets $qpool1 0 0 ||
+               error "pool_add_targets failed"
+
+       $LFS setquota -u $TSTUSR -B ${limit1}M -o $qpool1 $DIR ||
+               error "set user quota failed"
+
+       # make sure the system is clean
+       local used=$(getquota -u $TSTUSR global curspace)
+       [ $used -ne 0 ] && error "Used space($used) for user $TSTUSR isn't 0."
+
+       $LFS setstripe $TESTDIR -c 1 -i 0 || error "setstripe $TESTDIR failed"
+
+       test_1_check_write $testfile "user" $limit1
+
+       pool_remove_target $qpool1 0
+       rm -f $testfile
+       wait_delete_completed || error "wait_delete_completed failed"
+       sync_all_data || true
+
+       pool_add_targets $qpool1 0 0 || error "pool_add_targets failed"
+       # qunit for appropriate element in lgd array should be set
+       # correctly(4096). Earlier it was not changed continuing to be 1024.
+       # This caused write to hung when it hit limit1 - qunit shrinking to 1024
+       # for qpool1 lqe didn't cause changing qunit for OST0 in gld array
+       # as it already was 1024. As flag "need_update" for this qunit was
+       # not set, new qunit wasn't sent to OST0. Thus revoke was not set
+       # for "qpool1" lqe and it couldn't set EDQUOT despite granted
+       # became > 10M. QMT returned EINPROGRESS in a loop.
+       # Check that it doesn't hung anymore.
+       test_1_check_write $testfile "user" $limit1
+
+       # cleanup
+       cleanup_quota_test
+}
+run_test 1f "Quota pools: correct qunit after removing/adding OST"
 
 # test inode hardlimit
 test_2() {
@@ -729,34 +1096,35 @@ test_2() {
 run_test 2 "File hard limit (normal use and out of quota)"
 
 test_block_soft() {
-       local TESTFILE=$1
-       local GRACE=$2
-       local LIMIT=$3
+       local testfile=$1
+       local grace=$2
+       local limit=$3
        local OFFSET=0
        local qtype=$4
+       local pool=$5
 
        setup_quota_test
-       trap cleanup_quota_test EXIT
+       stack_trap cleanup_quota_test EXIT
 
-       $LFS setstripe $TESTFILE -c 1 -i 0
-       chown $TSTUSR.$TSTUSR $TESTFILE
+       $LFS setstripe $testfile -c 1 -i 0
+       chown $TSTUSR.$TSTUSR $testfile
        [ "$qtype" == "p" ] && is_project_quota_supported &&
-               change_project -p $TSTPRJID $TESTFILE
+               change_project -p $TSTPRJID $testfile
 
        echo "Write up to soft limit"
-       $RUNAS $DD of=$TESTFILE count=$LIMIT ||
+       $RUNAS $DD of=$testfile count=$limit ||
                quota_error a $TSTUSR "write failure, but expect success"
-       OFFSET=$((LIMIT * 1024))
+       OFFSET=$((limit * 1024))
        cancel_lru_locks osc
 
        echo "Write to exceed soft limit"
-       $RUNAS dd if=/dev/zero of=$TESTFILE bs=1K count=10 seek=$OFFSET ||
+       $RUNAS dd if=/dev/zero of=$testfile bs=1K count=10 seek=$OFFSET ||
                quota_error a $TSTUSR "write failure, but expect success"
        OFFSET=$((OFFSET + 1024)) # make sure we don't write to same block
        cancel_lru_locks osc
 
        echo "mmap write when over soft limit"
-       $RUNAS $MULTIOP $TESTFILE.mmap OT40960SMW ||
+       $RUNAS $MULTIOP $testfile.mmap OT40960SMW ||
                quota_error a $TSTUSR "mmap write failure, but expect success"
        cancel_lru_locks osc
 
@@ -768,12 +1136,12 @@ test_block_soft() {
        $SHOW_QUOTA_INFO_PROJID
 
        echo "Write before timer goes off"
-       $RUNAS dd if=/dev/zero of=$TESTFILE bs=1K count=10 seek=$OFFSET ||
+       $RUNAS dd if=/dev/zero of=$testfile bs=1K count=10 seek=$OFFSET ||
                quota_error a $TSTUSR "write failure, but expect success"
        OFFSET=$((OFFSET + 1024))
        cancel_lru_locks osc
 
-       wait_grace_time $qtype "block"
+       wait_grace_time $qtype "block" $pool
 
        $SHOW_QUOTA_USER
        $SHOW_QUOTA_GROUP
@@ -782,12 +1150,13 @@ test_block_soft() {
        $SHOW_QUOTA_INFO_GROUP
        $SHOW_QUOTA_INFO_PROJID
 
-       echo "Write after timer goes off"
+       log "Write after timer goes off"
        # maybe cache write, ignore.
-       $RUNAS dd if=/dev/zero of=$TESTFILE bs=1K count=10 seek=$OFFSET || true
+       $RUNAS dd if=/dev/zero of=$testfile bs=1K count=10 seek=$OFFSET || true
        OFFSET=$((OFFSET + 1024))
        cancel_lru_locks osc
-       $RUNAS dd if=/dev/zero of=$TESTFILE bs=1K count=10 seek=$OFFSET &&
+       log "Write after cancel lru locks"
+       $RUNAS dd if=/dev/zero of=$testfile bs=1K count=10 seek=$OFFSET &&
                quota_error a $TSTUSR "write success, but expect EDQUOT"
 
        $SHOW_QUOTA_USER
@@ -798,7 +1167,7 @@ test_block_soft() {
        $SHOW_QUOTA_INFO_PROJID
 
        echo "Unlink file to stop timer"
-       rm -f $TESTFILE
+       rm -f $testfile
        wait_delete_completed
        sync_all_data || true
 
@@ -809,84 +1178,249 @@ test_block_soft() {
        $SHOW_QUOTA_INFO_GROUP
        $SHOW_QUOTA_INFO_PROJID
 
-       $LFS setstripe $TESTFILE -c 1 -i 0
-       chown $TSTUSR.$TSTUSR $TESTFILE
-       [ "$qtype" == "p" ] && change_project -p $TSTPRJID $TESTFILE
+       $LFS setstripe $testfile -c 1 -i 0
+       chown $TSTUSR.$TSTUSR $testfile
+       [ "$qtype" == "p" ] && change_project -p $TSTPRJID $testfile
 
        echo "Write ..."
-       $RUNAS $DD of=$TESTFILE count=$LIMIT ||
+       $RUNAS $DD of=$testfile count=$limit ||
                quota_error a $TSTUSR "write failure, but expect success"
        # cleanup
        cleanup_quota_test
 }
 
 # block soft limit
-test_3() {
-       local GRACE=20 # 20s
+test_3a() {
+       local grace=20 # 20s
        if [ $(facet_fstype $SINGLEMDS) = "zfs" ]; then
-           GRACE=60
+           grace=60
        fi
-       local TESTFILE=$DIR/$tdir/$tfile-0
+       local testfile=$DIR/$tdir/$tfile-0
 
        # get minimum soft qunit size
-       local LIMIT=$(( $(do_facet $SINGLEMDS $LCTL get_param -n \
+       local limit=$(( $(do_facet $SINGLEMDS $LCTL get_param -n \
                qmt.$FSNAME-QMT0000.dt-0x0.soft_least_qunit) / 1024 ))
 
        set_ost_qtype $QTYPE || error "enable ost quota failed"
 
-       echo "User quota (soft limit:$LIMIT MB  grace:$GRACE seconds)"
+       echo "User quota (soft limit:$limit MB  grace:$grace seconds)"
        # make sure the system is clean
-       local USED=$(getquota -u $TSTUSR global curspace)
-       [ $USED -ne 0 ] && error "Used space($USED) for user $TSTUSR isn't 0."
+       local used=$(getquota -u $TSTUSR global curspace)
+       [ $used -ne 0 ] && error "Used space($used) for user $TSTUSR isn't 0."
 
-       $LFS setquota -t -u --block-grace $GRACE --inode-grace \
+       $LFS setquota -t -u --block-grace $grace --inode-grace \
                $MAX_IQ_TIME $DIR || error "set user grace time failed"
-       $LFS setquota -u $TSTUSR -b ${LIMIT}M -B 0 -i 0 -I 0 $DIR ||
+       $LFS setquota -u $TSTUSR -b ${limit}M -B 0 -i 0 -I 0 $DIR ||
                error "set user quota failed"
 
-       test_block_soft $TESTFILE $GRACE $LIMIT "u"
+       test_block_soft $testfile $grace $limit "u"
 
-       echo "Group quota (soft limit:$LIMIT MB  grace:$GRACE seconds)"
-       TESTFILE=$DIR/$tdir/$tfile-1
+       echo "Group quota (soft limit:$limit MB  grace:$grace seconds)"
+       testfile=$DIR/$tdir/$tfile-1
        # make sure the system is clean
-       USED=$(getquota -g $TSTUSR global curspace)
-       [ $USED -ne 0 ] && error "Used space($USED) for group $TSTUSR isn't 0."
+       used=$(getquota -g $TSTUSR global curspace)
+       [ $used -ne 0 ] && error "Used space($used) for group $TSTUSR isn't 0."
 
-       $LFS setquota -t -g --block-grace $GRACE --inode-grace \
+       $LFS setquota -t -g --block-grace $grace --inode-grace \
                $MAX_IQ_TIME $DIR || error "set group grace time failed"
-       $LFS setquota -g $TSTUSR -b ${LIMIT}M -B 0 -i 0 -I 0 $DIR ||
+       $LFS setquota -g $TSTUSR -b ${limit}M -B 0 -i 0 -I 0 $DIR ||
                error "set group quota failed"
 
-       test_block_soft $TESTFILE $GRACE $LIMIT "g"
+       test_block_soft $testfile $grace $limit "g"
 
        if is_project_quota_supported; then
-               echo "Project quota (soft limit:$LIMIT MB  grace:$GRACE sec)"
-               TESTFILE=$DIR/$tdir/$tfile-2
+               echo "Project quota (soft limit:$limit MB  grace:$grace sec)"
+               testfile=$DIR/$tdir/$tfile-2
                # make sure the system is clean
-               USED=$(getquota -p $TSTPRJID global curspace)
-               [ $USED -ne 0 ] && error \
-                       "Used space($USED) for project $TSTPRJID isn't 0."
+               used=$(getquota -p $TSTPRJID global curspace)
+               [ $used -ne 0 ] && error \
+                       "Used space($used) for project $TSTPRJID isn't 0."
+
+               $LFS setquota -t -p --block-grace $grace --inode-grace \
+                       $MAX_IQ_TIME $DIR ||
+                               error "set project grace time failed"
+               $LFS setquota -p $TSTPRJID -b ${limit}M -B 0 -i 0 -I 0 \
+                       $DIR || error "set project quota failed"
+
+               test_block_soft $testfile $grace $limit "p"
+               resetquota -p $TSTPRJID
+               $LFS setquota -t -p --block-grace $MAX_DQ_TIME --inode-grace \
+                       $MAX_IQ_TIME $DIR ||
+                               error "restore project grace time failed"
+       fi
+
+       # cleanup
+       $LFS setquota -t -u --block-grace $MAX_DQ_TIME --inode-grace \
+               $MAX_IQ_TIME $DIR || error "restore user grace time failed"
+       $LFS setquota -t -g --block-grace $MAX_DQ_TIME --inode-grace \
+               $MAX_IQ_TIME $DIR || error "restore group grace time failed"
+}
+run_test 3a "Block soft limit (start timer, timer goes off, stop timer)"
+
+test_3b() {
+       local grace=20 # 20s
+       local qpool="qpool1"
+       if [ $(facet_fstype $SINGLEMDS) = "zfs" ]; then
+               grace=60
+       fi
+       local testfile=$DIR/$tdir/$tfile-0
 
-               $LFS setquota -t -p --block-grace $GRACE --inode-grace \
+       mds_supports_qp
+       # get minimum soft qunit size
+       local limit=$(( $(do_facet $SINGLEMDS $LCTL get_param -n \
+               qmt.$FSNAME-QMT0000.dt-0x0.soft_least_qunit) / 1024 ))
+       local glbl_limit=$((2*limit))
+       local glbl_grace=$((2*grace))
+       echo "limit $limit glbl_limit $glbl_limit"
+       echo "grace $grace glbl_grace $glbl_grace"
+
+       set_ost_qtype $QTYPE || error "enable ost quota failed"
+
+       echo "User quota in $qpool(soft limit:$limit MB  grace:$grace seconds)"
+       # make sure the system is clean
+       local used=$(getquota -u $TSTUSR global curspace)
+       [ $used -ne 0 ] && error "Used space($used) for user $TSTUSR isn't 0."
+
+       pool_add $qpool || error "pool_add failed"
+       pool_add_targets $qpool 0 1 ||
+               error "pool_add_targets failed"
+
+       $LFS setquota -t -u --block-grace $glbl_grace --inode-grace \
+               $MAX_IQ_TIME $DIR || error "set user grace time failed"
+       $LFS setquota -t -u --block-grace $grace \
+               -o $qpool $DIR || error "set user grace time failed"
+
+       $LFS setquota -u $TSTUSR -b ${glbl_limit}M -B 0 -i 0 -I 0 $DIR ||
+               error "set user quota failed"
+       $LFS setquota -u $TSTUSR -b ${limit}M -B 0 -o $qpool $DIR ||
+               error "set user quota failed"
+
+       test_block_soft $testfile $grace $limit "u" $qpool
+
+       echo "Group quota in $qpool(soft limit:$limit MB  grace:$grace seconds)"
+       testfile=$DIR/$tdir/$tfile-1
+       # make sure the system is clean
+       used=$(getquota -g $TSTUSR global curspace)
+       [ $used -ne 0 ] && error "Used space($used) for group $TSTUSR isn't 0."
+
+       $LFS setquota -t -g --block-grace $glbl_grace --inode-grace \
+               $MAX_IQ_TIME $DIR || error "set group grace time failed"
+       $LFS setquota -t -g --block-grace $grace \
+               -o $qpool $DIR || error "set group grace time failed"
+
+       $LFS setquota -g $TSTUSR -b ${glbl_limit}M -B 0 -i 0 -I 0 $DIR ||
+               error "set group quota failed"
+       $LFS setquota -g $TSTUSR -b ${limit}M -B 0 -o $qpool $DIR ||
+               error "set group quota failed"
+
+       test_block_soft $testfile $grace $limit "g" $qpool
+
+       if is_project_quota_supported; then
+               echo "Project quota in $qpool(soft:$limit MB  grace:$grace sec)"
+               testfile=$DIR/$tdir/$tfile-2
+               # make sure the system is clean
+               used=$(getquota -p $TSTPRJID global curspace)
+               [ $used -ne 0 ] && error \
+                       "Used space($used) for project $TSTPRJID isn't 0."
+
+               $LFS setquota -t -p --block-grace $glbl_grace --inode-grace \
                        $MAX_IQ_TIME $DIR ||
                                error "set project grace time failed"
-               $LFS setquota -p $TSTPRJID -b ${LIMIT}M -B 0 -i 0 -I 0 \
+               $LFS setquota -t -p --block-grace $grace \
+                       -o $qpool $DIR || error "set project grace time failed"
+
+               $LFS setquota -p $TSTPRJID -b ${glbl_limit}M -B 0 -i 0 -I 0 \
                        $DIR || error "set project quota failed"
+               $LFS setquota -p $TSTPRJID -b ${limit}M -B 0 -o $qpool $DIR ||
+                       error "set project quota failed"
 
-               test_block_soft $TESTFILE $GRACE $LIMIT "p"
+               test_block_soft $testfile $grace $limit "p" $qpool
                resetquota -p $TSTPRJID
                $LFS setquota -t -p --block-grace $MAX_DQ_TIME --inode-grace \
                        $MAX_IQ_TIME $DIR ||
                                error "restore project grace time failed"
+               $LFS setquota -t -p --block-grace $MAX_DQ_TIME -o $qpool $DIR ||
+                       error "set project grace time failed"
        fi
 
        # cleanup
        $LFS setquota -t -u --block-grace $MAX_DQ_TIME --inode-grace \
                $MAX_IQ_TIME $DIR || error "restore user grace time failed"
+       $LFS setquota -t -u --block-grace $MAX_DQ_TIME \
+               -o $qpool $DIR || error "restore user grace time failed"
        $LFS setquota -t -g --block-grace $MAX_DQ_TIME --inode-grace \
                $MAX_IQ_TIME $DIR || error "restore group grace time failed"
+       $LFS setquota -t -g --block-grace $MAX_DQ_TIME \
+               -o $qpool $DIR || error "restore group grace time failed"
+}
+run_test 3b "Quota pools: Block soft limit (start timer, expires, stop timer)"
+
+test_3c() {
+       local grace=20 # 20s
+       local qpool="qpool1"
+       local qpool2="qpool2"
+       if [ $(facet_fstype $SINGLEMDS) = "zfs" ]; then
+               grace=60
+       fi
+       local testfile=$DIR/$tdir/$tfile-0
+
+       mds_supports_qp
+       # get minimum soft qunit size
+       local limit=$(( $(do_facet $SINGLEMDS $LCTL get_param -n \
+               qmt.$FSNAME-QMT0000.dt-0x0.soft_least_qunit) / 1024 ))
+       local limit2=$((limit+4))
+       local glbl_limit=$((limit+8))
+       local grace1=$((grace+10))
+       local grace2=$grace
+       local glbl_grace=$((grace+20))
+       echo "limit $limit limit2 $limit2 glbl_limit $glbl_limit"
+       echo "grace1 $grace1 grace2 $grace2 glbl_grace $glbl_grace"
+
+       set_ost_qtype $QTYPE || error "enable ost quota failed"
+
+       echo "User quota in qpool2(soft:$limit2 MB grace:$grace2 seconds)"
+       # make sure the system is clean
+       local used=$(getquota -u $TSTUSR global curspace)
+       [ $used -ne 0 ] && error "Used space($used) for user $TSTUSR isn't 0."
+
+       pool_add $qpool || error "pool_add failed"
+       pool_add_targets $qpool 0 1 ||
+               error "pool_add_targets failed"
+
+       pool_add $qpool2 || error "pool_add failed"
+       pool_add_targets $qpool2 0 1 ||
+               error "pool_add_targets failed"
+
+
+       $LFS setquota -t -u --block-grace $glbl_grace --inode-grace \
+               $MAX_IQ_TIME $DIR || error "set user grace time failed"
+       $LFS setquota -t -u --block-grace $grace1 \
+               -o $qpool $DIR || error "set user grace time failed"
+       $LFS setquota -t -u --block-grace $grace2 \
+               -o $qpool2 $DIR || error "set user grace time failed"
+
+       $LFS setquota -u $TSTUSR -b ${glbl_limit}M -B 0 -i 0 -I 0 $DIR ||
+               error "set user quota failed"
+       $LFS setquota -u $TSTUSR -b ${limit}M -B 0 -o $qpool $DIR ||
+               error "set user quota failed"
+       # qpool has minimum soft limit, but it's grace is grater than
+       # grace period of qpool2. Thus write shouldn't fail when
+       # hit qpool soft limit - only when reaches up qpool2 limit
+       # after grace2 seconds.
+       $LFS setquota -u $TSTUSR -b ${limit2}M -B 0 -o $qpool2 $DIR ||
+               error "set user quota failed"
+
+       test_block_soft $testfile $grace2 $limit2 "u" $qpool2
+
+       # cleanup
+       $LFS setquota -t -u --block-grace $MAX_DQ_TIME --inode-grace \
+               $MAX_IQ_TIME $DIR || error "restore user grace time failed"
+       $LFS setquota -t -u --block-grace $MAX_DQ_TIME \
+               -o $qpool $DIR || error "restore user grace time failed"
+       $LFS setquota -t -u --block-grace $MAX_DQ_TIME \
+               -o $qpool2 $DIR || error "restore user grace time failed"
 }
-run_test 3 "Block soft limit (start timer, timer goes off, stop timer)"
+run_test 3c "Quota pools: check block soft limit on different pools"
 
 test_file_soft() {
        local TESTFILE=$1
@@ -3537,6 +4071,330 @@ test_66() {
 }
 run_test 66 "nonroot user can not change project state in default"
 
+test_67_write() {
+       local file="$1"
+       local qtype="$2"
+       local size=$3
+       local _runas=""
+       local short_qtype=${qtype:0:1}
+
+       echo "file "$file
+       echo "0 $0 1 $1 2 $2 3 $3 4 $4"
+       case "$4" in
+               quota_usr)  _runas=$RUNAS;;
+               quota_2usr) _runas=$RUNAS2;;
+               *)          error "unknown quota parameter $4";;
+       esac
+
+       log "Write..."
+       date
+       $_runas $DD of=$file count=$size ||
+               quota_error $short_qtype $TSTUSR \
+                       "$qtype write failure, but expect success"
+       date
+       cancel_lru_locks osc
+       date
+       sync; sync_all_data || true
+       date
+}
+
+getgranted() {
+       local pool=$1
+       local ptype=$2
+       local userid=$3
+       local qtype=$4
+       local param=qmt.$FSNAME-QMT0000.$ptype-$pool.glb-$qtype
+
+       do_facet mds1 $LCTL get_param $param |
+               grep -A2 $userid | awk -F'[, ]*' 'NR==2{print $9}'
+}
+
+test_67() {
+       local limit=20  # 20M
+       local testfile="$DIR/$tdir/$tfile-0"
+       local testfile2="$DIR/$tdir/$tfile-1"
+       local testfile3="$DIR/$tdir/$tfile-2"
+       local qpool="qpool1"
+       local used
+       local granted
+       local granted_mb
+
+       mds_supports_qp
+       [ "$ost1_FSTYPE" == zfs ] &&
+               skip "ZFS grants some block space together with inode"
+
+       setup_quota_test || error "setup quota failed with $?"
+       trap cleanup_quota_test EXIT
+
+       # enable ost quota
+       set_ost_qtype $QTYPE || error "enable ost quota failed"
+
+       # test for user
+       log "User quota (block hardlimit:$limit MB)"
+       $LFS setquota -u $TSTUSR -b 0 -B ${limit}M -i 0 -I 0 $DIR ||
+               error "set user quota failed"
+
+       # make sure the system is clean
+       used=$(getquota -u $TSTUSR global curspace)
+       [ $used -ne 0 ] && error "Used space($used) for user $TSTUSR isn't 0."
+
+       granted=$(getgranted "0x0" "dt" $TSTID "usr")
+       echo "granted 0x0 before write $granted"
+
+       # trigger reintegration
+       local procf="osd-$(facet_fstype ost1).$FSNAME-OST*."
+       procf=${procf}quota_slave.force_reint
+       do_facet ost1 $LCTL set_param $procf=1 ||
+               error "force reintegration failed"
+       wait_ost_reint "u" || error "reintegration failed"
+       granted=$(getgranted "0x0" "dt" $TSTID "usr")
+       [ $granted -ne 0 ] &&
+               error "Granted($granted) for $TSTUSR in $qpool isn't 0."
+
+       $LFS setstripe $testfile -c 1 -i 0 || error "setstripe $testfile failed"
+       chown $TSTUSR.$TSTUSR $testfile || error "chown $testfile failed"
+
+       # write 10 MB to testfile
+       test_67_write "$testfile" "user" 10 "quota_usr"
+
+       # create qpool and add OST1
+       pool_add $qpool || error "pool_add failed"
+       pool_add_targets $qpool 1 1 || error "pool_add_targets failed"
+       # as quota_usr hasn't limits, lqe may absent. But it should be
+       # created after the 1st direct qmt_get.
+       used=$(getquota -u $TSTUSR global bhardlimit $qpool)
+
+       # check granted - should be 0, as testfile is located only on OST0
+       granted=$(getgranted "0x0" "dt" $TSTID "usr")
+       echo "global granted $granted"
+       granted=$(getgranted $qpool "dt" $TSTID "usr")
+       echo "$qpool granted $granted"
+       [ $granted -ne 0 ] &&
+               error "Granted($granted) for $TSTUSR in $qpool isn't 0."
+
+       # add OST0 to qpool and check granted space
+       pool_add_targets $qpool 0 1 ||
+               error "pool_add_targets failed"
+       granted_mb=$(($(getgranted $qpool "dt" $TSTID "usr")/1024))
+       echo "Granted $granted_mb MB"
+       #should be 10M + qunit for each OST
+       [ $granted_mb -ge 10 -a $granted_mb -lt $limit ] ||
+               error "Granted($granted_mb) for $TSTUSR in $qpool is wrong."
+
+       $LFS setstripe $testfile2 -c 1 -i 1 ||
+               error "setstripe $testfile2 failed"
+       chown $TSTUSR2.$TSTUSR2 $testfile2 || error "chown $testfile2 failed"
+       # Write from another user and check that qpool1
+       # shows correct granted, despite quota_2usr hasn't limits in qpool1.
+       test_67_write "$testfile2" "user" 10 "quota_2usr"
+       used=$(getquota -u $TSTUSR2 global curspace $qpool)
+       granted=$(getgranted $qpool "dt" $TSTID2 "usr")
+       [ $granted -ne 0 ] &&
+               error "Granted($granted) for $TSTUSR2 in $qpool isn't 0."
+
+       # Granted space for quota_2usr in qpool1 should appear only
+       # when global lqe for this user becomes enforced.
+       $LFS setquota -u $TSTUSR2 -B ${limit}M $DIR ||
+               error "set user quota failed"
+       granted_mb=$(($(getgranted $qpool "dt" $TSTID2 "usr")/1024))
+       echo "granted_mb $granted_mb"
+       [ $granted_mb -ge 10 -a $granted_mb -lt $limit ] ||
+               error "Granted($granted) for $TSTUSR in $qpool is wrong."
+
+       $LFS setstripe $testfile3 -c 1 -i 0 ||
+               error "setstripe $testfile3 failed"
+       chown $TSTUSR2.$TSTUSR2 $testfile3 || error "chown $testfile3 failed"
+       test_67_write "$testfile3" "user" 10 "quota_2usr"
+       granted_mb=$(($(getgranted $qpool "dt" $TSTID2 "usr")/1024))
+       echo "$testfile3 granted_mb $granted_mb"
+       [ $granted_mb -eq $limit ] ||
+               error "Granted($granted_mb) for $TSTUSR2 is not equal to 20M"
+
+       # remove OST1 from the qpool1 and check granted space
+       # should be 0 for TSTUSR and 10M for TSTUSR2
+       pool_remove_target $qpool 0
+       granted_mb=$(($(getgranted $qpool "dt" $TSTID "usr")/1024))
+       [ $granted_mb -eq 0 ] ||
+               error "Granted($granted_mb) for $TSTUSR in $qpool != 0."
+       granted_mb=$(($(getgranted $qpool "dt" $TSTID2 "usr")/1024))
+       [ $granted_mb -eq 10 ] ||
+               error "Granted($granted_mb) for $TSTUSR2 is not equal to 10M"
+
+       rm -f $testfile
+       wait_delete_completed || error "wait_delete_completed failed"
+       sync_all_data || true
+       used=$(getquota -u $TSTUSR global curspace)
+       [ $used -ne 0 ] && quota_error u $TSTUSR \
+               "user quota isn't released after deletion"
+       resetquota -u $TSTUSR
+
+       cleanup_quota_test
+}
+run_test 67 "quota pools recalculation"
+
+get_slave_nr() {
+       local pool=$1
+       local qtype=$2
+       local nr
+
+       do_facet mds1 $LCTL get_param -n qmt.$FSNAME-QMT0000.dt-$pool.info |
+               awk '/usr/ {getline; print $2}'
+}
+
+test_68()
+{
+       local qpool="qpool1"
+
+       mds_supports_qp
+       setup_quota_test || error "setup quota failed with $?"
+       stack_trap cleanup_quota_test EXIT
+
+       # enable ost quota
+       set_ost_qtype $QTYPE || error "enable ost quota failed"
+
+       # check slave number for glbal pool
+       local nr=$(get_slave_nr "0x0" "usr")
+       echo "nr result $nr"
+       [[ $nr != $((OSTCOUNT + MDSCOUNT)) ]] &&
+               error "Slave_nr $nr for global pool != ($OSTCOUNT + $MDSCOUNT)"
+
+       # create qpool and add OST1
+       pool_add $qpool || error "pool_add failed"
+       nr=$(get_slave_nr $qpool "usr")
+       [[ $nr != 0 ]] && error "Slave number $nr for $qpool != 0"
+
+       # add OST1 to qpool
+       pool_add_targets $qpool 1 1 || error "pool_add_targets failed"
+       nr=$(get_slave_nr $qpool "usr")
+       [[ $nr != 1 ]] && error "Slave number $nr for $qpool != 1"
+
+       # add OST0 to qpool
+       pool_add_targets $qpool 0 1 || error "pool_add_targets failed"
+       nr=$(get_slave_nr $qpool "usr")
+       [[ $nr != 2 ]] && error "Slave number $nr for $qpool != 2"
+
+       # remove OST0
+       pool_remove_target $qpool 0
+       nr=$(get_slave_nr $qpool "usr")
+       [[ $nr != 1 ]] && error "Slave number $nr for $qpool != 1"
+
+       # remove OST1
+       pool_remove_target $qpool 1
+       nr=$(get_slave_nr $qpool "usr")
+       [[ $nr != 0 ]] && error "Slave number $nr for $qpool != 0"
+
+       # Check again that all is fine with global pool
+       nr=$(get_slave_nr "0x0" "usr")
+       [[ $nr != $((OSTCOUNT + MDSCOUNT)) ]] &&
+               error "Slave_nr $nr for global pool != ($OSTCOUNT + $MDSCOUNT)"
+
+       cleanup_quota_test
+}
+run_test 68 "slave number in quota pool changed after each add/remove OST"
+
+test_69()
+{
+       local global_limit=200  # 200M
+       local limit=10  # 10M
+       local testfile="$DIR/$tdir/$tfile-0"
+       local dom0="$DIR/$tdir/dom0"
+       local qpool="qpool1"
+
+       mds_supports_qp
+       setup_quota_test || error "setup quota failed with $?"
+       stack_trap cleanup_quota_test EXIT
+
+       # enable ost quota
+       set_ost_qtype $QTYPE || error "enable ost quota failed"
+       set_mdt_qtype $QTYPE || error "enable mdt quota failed"
+
+       # Save DOM only at MDT0
+       $LFS setdirstripe -c 1 -i 0 $dom0 || error "cannot create $dom0"
+       $LFS setstripe -E 1M $dom0 -L mdt || error "setstripe to $dom0 failed"
+       chmod 0777 $dom0
+       $LFS setstripe -c 1 -i 0 "$DIR/$tdir/"
+
+       # create qpool and add OST0
+       pool_add $qpool || error "pool_add failed"
+       pool_add_targets $qpool 0 0 || error "pool_add_targets failed"
+
+       log "User quota (block hardlimit:$global_limit MB)"
+       $LFS setquota -u $TSTUSR -b 0 -B ${global_limit}M -i 0 -I 0 $DIR ||
+               error "set user quota failed"
+
+       log "User quota (block hardlimit:$limit MB)"
+       $LFS setquota -u $TSTUSR -B ${limit}M -o $qpool $DIR ||
+               error "set user quota failed"
+
+       $RUNAS dd if=/dev/zero of="$dom0/f1" bs=1K count=512 oflag=sync ||
+               quota_error u $TSTUSR "write failed"
+
+       $RUNAS dd if=/dev/zero of="$dom0/f1" bs=1K count=512 seek=512 \
+               oflag=sync || quota_error u $TSTUSR "write failed"
+
+       $RUNAS $DD of=$testfile count=$limit || true
+
+       # flush cache, ensure noquota flag is set on client
+       cancel_lru_locks osc
+       sync; sync_all_data || true
+
+       # MDT0 shouldn't get EDQUOT with glimpse.
+       $RUNAS $DD of=$testfile count=$limit seek=$limit &&
+               quota_error u $TSTUSR \
+                       "user write success, but expect EDQUOT"
+
+       # Now all members of qpool1 should get EDQUOT. Expect success
+       # when write to DOM on MDT0, as it belongs to global pool.
+       $RUNAS dd if=/dev/zero of="$dom0/f1" bs=1K count=512 \
+               oflag=sync || quota_error u $TSTUSR "write failed"
+
+       $RUNAS dd if=/dev/zero of="$dom0/f1" bs=1K count=512 seek=512 \
+               oflag=sync || quota_error u $TSTUSR "write failed"
+
+       cleanup_quota_test
+}
+run_test 69 "EDQUOT at one of pools shouldn't affect DOM"
+
+test_70()
+{
+       local qpool="qpool1"
+       local limit=20
+       local err=0
+       local bhard
+
+       [[ CLIENT_VERSION -lt $(version_code $VERSION_WITH_QP) ]] &&
+               skip "Needs a client >= $VERSION_WITH_QP"
+
+       setup_quota_test || error "setup quota failed with $?"
+       stack_trap cleanup_quota_test EXIT
+
+       # MDS returns EFAULT for unsupported quotactl command
+       [[ $MDS1_VERSION -lt $(version_code $VERSION_WITH_QP) ]] && err=14
+
+       # create qpool and add OST0
+       pool_add $qpool || error "pool_add failed"
+       pool_add_targets $qpool 0 0 || error "pool_add_targets failed"
+
+       $LFS setquota -u $TSTUSR -B ${limit}M -o $qpool $DIR
+       rc=$?
+       [ $rc -eq $err ] || error "setquota res $rc != $err"
+
+       # If MDS supports QP, check that limit was set properly.
+       if [[ $MDS1_VERSION -ge $(version_code $VERSION_WITH_QP) ]]; then
+               bhard=$(getquota -u $TSTUSR global bhardlimit $qpool)
+               echo "hard limit $bhard limit $limit"
+               [ $bhard -ne $((limit*1024)) ] &&
+                       error "bhard:$bhard for $qpool!=$((limit*1024))"
+       fi
+
+       $LFS quota -u $TSTUSR --pool $qpool $DIR
+       rc=$?
+       [ $rc -eq $err ] || error "quota res $rc != $err"
+
+       cleanup_quota_test
+}
+run_test 70 "check lfs setquota/quota with a pool option"
+
 quota_fini()
 {
        do_nodes $(comma_list $(nodes_list)) "lctl set_param debug=-quota"
index 5868472..0249c43 100755 (executable)
@@ -9043,10 +9043,21 @@ pool_file_rel_path() {
 
 pool_remove_first_target() {
        echo "Removing first target from a pool"
+       pool_remove_target $1 -1
+}
+
+pool_remove_target() {
        local pool=$1
+       local index=$2
 
        local pname="lov.$FSNAME-*.pools.$pool"
-       local t=$($LCTL get_param -n $pname | head -1)
+       if [ $index -eq -1 ]; then
+               local t=$($LCTL get_param -n $pname | head -1)
+       else
+               local t=$(printf "$FSNAME-OST%04x_UUID" $index)
+       fi
+
+       echo "Removing $t from $pool"
        do_facet mgs $LCTL pool_remove $FSNAME.$pool $t
        for mds_id in $(seq $MDSCOUNT); do
                local mdt_id=$((mds_id-1))
index bcad3e6..4407f56 100644 (file)
@@ -6506,28 +6506,38 @@ static inline int has_times_option(int argc, char **argv)
         return 0;
 }
 
-int lfs_setquota_times(int argc, char **argv)
+static inline int lfs_verify_poolarg(char *pool)
 {
-        int c, rc;
-        struct if_quotactl qctl;
-        char *mnt, *obd_type = (char *)qctl.obd_type;
-        struct obd_dqblk *dqb = &qctl.qc_dqblk;
-        struct obd_dqinfo *dqi = &qctl.qc_dqinfo;
-        struct option long_opts[] = {
+       if (strnlen(optarg, LOV_MAXPOOLNAME + 1) > LOV_MAXPOOLNAME) {
+               fprintf(stderr,
+                       "Pool name '%.*s' is longer than %d\n",
+                       LOV_MAXPOOLNAME, pool, LOV_MAXPOOLNAME);
+               return 1;
+       }
+       return 0;
+}
+
+int lfs_setquota_times(int argc, char **argv, struct if_quotactl *qctl)
+{
+       int c, rc;
+       char *mnt, *obd_type = (char *)qctl->obd_type;
+       struct obd_dqblk *dqb = &qctl->qc_dqblk;
+       struct obd_dqinfo *dqi = &qctl->qc_dqinfo;
+       struct option long_opts[] = {
        { .val = 'b',   .name = "block-grace",  .has_arg = required_argument },
        { .val = 'g',   .name = "group",        .has_arg = no_argument },
        { .val = 'i',   .name = "inode-grace",  .has_arg = required_argument },
        { .val = 'p',   .name = "projid",       .has_arg = no_argument },
        { .val = 't',   .name = "times",        .has_arg = no_argument },
        { .val = 'u',   .name = "user",         .has_arg = no_argument },
+       { .val = 'o',   .name = "pool",         .has_arg = required_argument },
        { .name = NULL } };
        int qtype;
 
-       memset(&qctl, 0, sizeof(qctl));
-       qctl.qc_cmd  = LUSTRE_Q_SETINFO;
-       qctl.qc_type = ALLQUOTA;
+       qctl->qc_cmd  = LUSTRE_Q_SETINFO;
+       qctl->qc_type = ALLQUOTA;
 
-       while ((c = getopt_long(argc, argv, "b:gi:ptu",
+       while ((c = getopt_long(argc, argv, "b:gi:ptuo:",
                                long_opts, NULL)) != -1) {
                switch (c) {
                case 'u':
@@ -6539,12 +6549,12 @@ int lfs_setquota_times(int argc, char **argv)
                case 'p':
                        qtype = PRJQUOTA;
 quota_type:
-                       if (qctl.qc_type != ALLQUOTA) {
+                       if (qctl->qc_type != ALLQUOTA) {
                                fprintf(stderr, "error: -u/g/p can't be used "
                                                 "more than once\n");
                                return CMD_HELP;
                        }
-                       qctl.qc_type = qtype;
+                       qctl->qc_type = qtype;
                        break;
                case 'b':
                        if (strncmp(optarg, NOTIFY_GRACE,
@@ -6576,33 +6586,41 @@ quota_type:
                        break;
                case 't': /* Yes, of course! */
                        break;
+               case 'o':
+                       if (lfs_verify_poolarg(optarg))
+                               return -1;
+                       fprintf(stdout,
+                               "Trying to set grace for pool %s\n", optarg);
+                       strncpy(qctl->qc_poolname, optarg, LOV_MAXPOOLNAME);
+                       qctl->qc_cmd  = LUSTRE_Q_SETINFOPOOL;
+                       break;
                /* getopt prints error message for us when opterr != 0 */
                default:
                        return CMD_HELP;
                }
        }
 
-       if (qctl.qc_type == ALLQUOTA) {
+       if (qctl->qc_type == ALLQUOTA) {
                fprintf(stderr, "error: neither -u, -g nor -p specified\n");
-                return CMD_HELP;
-        }
+               return CMD_HELP;
+       }
 
-        if (optind != argc - 1) {
-                fprintf(stderr, "error: unexpected parameters encountered\n");
-                return CMD_HELP;
-        }
+       if (optind != argc - 1) {
+               fprintf(stderr, "error: unexpected parameters encountered\n");
+               return CMD_HELP;
+       }
 
-        mnt = argv[optind];
-        rc = llapi_quotactl(mnt, &qctl);
-        if (rc) {
-                if (*obd_type)
-                        fprintf(stderr, "%s %s ", obd_type,
-                                obd_uuid2str(&qctl.obd_uuid));
-                fprintf(stderr, "setquota failed: %s\n", strerror(-rc));
-                return rc;
-        }
+       mnt = argv[optind];
+       rc = llapi_quotactl(mnt, qctl);
+       if (rc) {
+               if (*obd_type)
+                       fprintf(stderr, "%s %s ", obd_type,
+                               obd_uuid2str(&qctl->obd_uuid));
+               fprintf(stderr, "setquota failed: %s\n", strerror(-rc));
+               return rc;
+       }
 
-        return 0;
+       return 0;
 }
 
 #define BSLIMIT (1 << 0)
@@ -6613,9 +6631,9 @@ quota_type:
 int lfs_setquota(int argc, char **argv)
 {
        int c, rc = 0;
-       struct if_quotactl qctl;
-       char *mnt, *obd_type = (char *)qctl.obd_type;
-       struct obd_dqblk *dqb = &qctl.qc_dqblk;
+       struct if_quotactl *qctl;
+       char *mnt, *obd_type;
+       struct obd_dqblk *dqb;
        struct option long_opts[] = {
        { .val = 'b',   .name = "block-softlimit",
                                                .has_arg = required_argument },
@@ -6632,79 +6650,93 @@ int lfs_setquota(int argc, char **argv)
        { .val = 'P',   .name = "default-prj",  .has_arg = no_argument },
        { .val = 'u',   .name = "user",         .has_arg = required_argument },
        { .val = 'U',   .name = "default-usr",  .has_arg = no_argument },
+       { .val = 'o',   .name = "pool",         .has_arg = required_argument },
        { .name = NULL } };
        unsigned limit_mask = 0;
        char *endptr;
        bool use_default = false;
-     &nbs