Whamcloud - gitweb
LU-1842 quota: add quotactl support on qmt
authorJohann Lombardi <johann.lombardi@intel.com>
Tue, 2 Oct 2012 16:11:21 +0000 (18:11 +0200)
committerOleg Drokin <green@whamcloud.com>
Thu, 4 Oct 2012 18:59:31 +0000 (14:59 -0400)
This patch implements quotactl ({get,set}{quota,info}) support on the
QMT. This involves defining the lqe operations for the qmt as well as
pool operations in order to support per-pool quota in the future.

Signed-off-by: Johann Lombardi <johann@whamcloud.com>
Change-Id: I122e7cb85eb6d1d3460e86217e2a6723763c8cbf
Reviewed-on: http://review.whamcloud.com/4160
Tested-by: Hudson
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: Niu Yawei <niu@whamcloud.com>
Reviewed-by: Mike Pershin <tappro@whamcloud.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
12 files changed:
lustre/ldlm/ldlm_resource.c
lustre/quota/Makefile.in
lustre/quota/lproc_quota.c
lustre/quota/lquota_disk.c
lustre/quota/lquota_entry.c
lustre/quota/lquota_internal.h
lustre/quota/lquota_lib.c
lustre/quota/qmt_dev.c
lustre/quota/qmt_entry.c [new file with mode: 0644]
lustre/quota/qmt_handler.c
lustre/quota/qmt_internal.h
lustre/quota/qmt_pool.c [new file with mode: 0644]

index c484953..a55e947 100644 (file)
@@ -936,6 +936,7 @@ void ldlm_namespace_get(struct ldlm_namespace *ns)
 {
         cfs_atomic_inc(&ns->ns_bref);
 }
+EXPORT_SYMBOL(ldlm_namespace_get);
 
 void ldlm_namespace_put(struct ldlm_namespace *ns)
 {
@@ -944,6 +945,7 @@ void ldlm_namespace_put(struct ldlm_namespace *ns)
                 cfs_spin_unlock(&ns->ns_lock);
         }
 }
+EXPORT_SYMBOL(ldlm_namespace_put);
 
 /* Register @ns in the list of namespaces */
 void ldlm_namespace_register(struct ldlm_namespace *ns, ldlm_side_t client)
index 193f727..7b61887 100644 (file)
@@ -1,6 +1,6 @@
 MODULES := lquota
 
-qmt-objs := qmt_dev.o qmt_handler.o qmt_lock.o
+qmt-objs := qmt_dev.o qmt_handler.o qmt_lock.o qmt_entry.o qmt_pool.o
 
 quota-objs := lproc_quota.o lquota_lib.o lquota_disk.o lquota_entry.o
 
index 581cf63..9c65526 100644 (file)
  * in the LICENSE file that accompanied this code).
  *
  * You should have received a copy of the GNU General Public License
- * version 2 along with this program; If not, see
- * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
- *
- * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
- * CA 95054 USA or visit www.sun.com if you need additional information or
- * have any questions.
+ * version 2 along with this program; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 021110-1307, USA
  *
  * GPL HEADER END
  */
 /*
- * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2012 Intel, Inc.
  * Use is subject to license terms.
  *
- * Copyright (c) 2011, Whamcloud, Inc.
- */
-/*
- * This file is part of Lustre, http://www.lustre.org/
- * Lustre is a trademark of Sun Microsystems, Inc.
+ * Author: Johann Lombardi <johann.lombardi@intel.com>
+ * Author: Niu    Yawei    <yawei.niu@intel.com>
  */
 
 #define DEBUG_SUBSYSTEM S_LQUOTA
@@ -167,20 +161,22 @@ static void *lprocfs_quota_seq_next(struct seq_file *p, void *v, loff_t *pos)
 /*
  * Output example:
  *
- * user_accounting:
+ * usr_accounting:
  * - id:      0
- *   usage:   { inodes:                  209, bytes:             26161152 }
+ *   usage:   { inodes:                  209, kbytes:             2616 }
  * - id:      840000038
- *   usage:   { inodes:                    1, bytes:             10485760 }
+ *   usage:   { inodes:                    1, kbytes:             1048 }
  */
 static int lprocfs_quota_seq_show(struct seq_file *p, void *v)
 {
-       struct lquota_procfs    *lqp = p->private;
-       const struct dt_it_ops  *iops;
-       struct dt_it            *it;
-       struct dt_key           *key;
-       struct lquota_acct_rec   rec;
-       int                      rc;
+       struct lquota_procfs            *lqp = p->private;
+       struct lquota_thread_info       *qti = lquota_info(&lqp->lqp_env);
+       const struct dt_it_ops          *iops;
+       struct dt_it                    *it;
+       struct dt_key                   *key;
+       struct dt_rec                   *rec = (struct dt_rec *)&qti->qti_rec;
+       const struct lu_fid             *fid;
+       int                              rc;
 
        LASSERT(lqp);
        if (lqp->lqp_obj == NULL) {
@@ -188,14 +184,26 @@ static int lprocfs_quota_seq_show(struct seq_file *p, void *v)
                return 0;
        }
 
-       if (v == SEQ_START_TOKEN) {
-               const struct lu_fid *fid = lu_object_fid(&lqp->lqp_obj->do_lu);
+       fid = lu_object_fid(&lqp->lqp_obj->do_lu);
 
-               LASSERT(fid_is_acct(fid));
-               if (fid_oid(fid) == ACCT_USER_OID)
-                       seq_printf(p, "user_accounting:\n");
-               else
-                       seq_printf(p, "group_accounting:\n");
+       if (v == SEQ_START_TOKEN) {
+               if (fid_is_acct(fid)) {
+                       if (fid_oid(fid) == ACCT_USER_OID)
+                               seq_printf(p, "usr_accounting:\n");
+                       else
+                               seq_printf(p, "grp_accounting:\n");
+               } else if (fid_seq(fid) == FID_SEQ_QUOTA_GLB) {
+                       int     poolid, rtype, qtype;
+
+                       rc = lquota_extract_fid(fid, &poolid, &rtype, &qtype);
+                       if (rc)
+                               return rc;
+
+                       seq_printf(p, "global_pool%d_%s_%s\n", poolid,
+                                  RES_NAME(rtype), QTYPE_NAME(qtype));
+               } else {
+                       return -ENOTSUPP;
+               }
                return 0;
        }
 
@@ -210,7 +218,7 @@ static int lprocfs_quota_seq_show(struct seq_file *p, void *v)
                return PTR_ERR(key);
        }
 
-       rc = iops->rec(&lqp->lqp_env, it, (struct dt_rec *)&rec, 0);
+       rc = iops->rec(&lqp->lqp_env, it, rec, 0);
        if (rc) {
                CERROR("%s: failed to get rec: rc = %d\n",
                       lqp->lqp_obj->do_lu.lo_dev->ld_obd->obd_name, rc);
@@ -218,8 +226,19 @@ static int lprocfs_quota_seq_show(struct seq_file *p, void *v)
        }
 
        seq_printf(p, "- %-8s %llu\n", "id:", *((__u64 *)key));
-       seq_printf(p, "  %-8s { inodes: %20"LPF64"u, bytes: %20"LPF64"u }\n",
-                  "usage:", rec.ispace, rec.bspace);
+       if (fid_is_acct(fid))
+               seq_printf(p, "  %-8s { inodes: %20"LPF64"u, kbytes: %20"LPF64
+                          "u }\n", "usage:",
+                          ((struct lquota_acct_rec *)rec)->ispace,
+                          toqb(((struct lquota_acct_rec *)rec)->bspace));
+       else if (fid_seq(fid) == FID_SEQ_QUOTA_GLB)
+               seq_printf(p, "  %-8s { hard: %20"LPF64"u, soft: %20"LPF64
+                          "u, granted: %20"LPF64"u, time: %20"LPF64"u }\n",
+                          "limits:",
+                          ((struct lquota_glb_rec *)rec)->qbr_hardlimit,
+                          ((struct lquota_glb_rec *)rec)->qbr_softlimit,
+                          ((struct lquota_glb_rec *)rec)->qbr_granted,
+                          ((struct lquota_glb_rec *)rec)->qbr_time);
        return 0;
 }
 
index 6421bc4..777b2ab 100644 (file)
@@ -92,18 +92,6 @@ lquota_disk_find_create(const struct lu_env *env, struct dt_device *dev,
        obj = dt_locate(env, dev, &qti->qti_fid);
        if (IS_ERR(obj))
                GOTO(out, obj);
-
-       /* install index operation vector */
-       if (obj->do_index_ops == NULL) {
-               rc = obj->do_ops->do_index_try(env, obj, idx_feat);
-               if (rc) {
-                       CERROR("%s: fail to setup index operations for "DFID
-                              " rc:%d\n", dev->dd_lu_dev.ld_obd->obd_name,
-                              PFID(lu_object_fid(&obj->do_lu)), rc);
-                       lu_object_put(env, &obj->do_lu);
-                       obj = ERR_PTR(rc);
-               }
-       }
 out:
        local_oid_storage_fini(env, los);
        RETURN(obj);
@@ -291,10 +279,26 @@ struct dt_object *lquota_disk_glb_find_create(const struct lu_env *env,
                                                              idx_feat);
        }
 
-       if (IS_ERR(glb_idx))
+       if (IS_ERR(glb_idx)) {
                CERROR("%s: failed to look-up/create idx file "DFID" rc:%ld "
                       "local:%d\n", dev->dd_lu_dev.ld_obd->obd_name,
                       PFID(fid), PTR_ERR(glb_idx), local);
+               RETURN(glb_idx);
+       }
+
+       /* install index operation vector */
+       if (glb_idx->do_index_ops == NULL) {
+               int rc;
+
+               rc = glb_idx->do_ops->do_index_try(env, glb_idx, idx_feat);
+               if (rc) {
+                       CERROR("%s: failed to setup index operations for "DFID
+                              " rc:%d\n", dev->dd_lu_dev.ld_obd->obd_name,
+                              PFID(lu_object_fid(&glb_idx->do_lu)), rc);
+                       lu_object_put(env, &glb_idx->do_lu);
+                       glb_idx = ERR_PTR(rc);
+               }
+       }
 
        RETURN(glb_idx);
 }
@@ -435,6 +439,22 @@ struct dt_object *lquota_disk_slv_find_create(const struct lu_env *env,
                                                  qti->qti_buf);
        }
 
+       if (IS_ERR(slv_idx))
+               RETURN(slv_idx);
+
+       /* install index operation vector */
+       if (slv_idx->do_index_ops == NULL) {
+               rc = slv_idx->do_ops->do_index_try(env, slv_idx,
+                                                  &dt_quota_slv_features);
+               if (rc) {
+                       CERROR("%s: failed to setup index operations for "DFID
+                              " rc:%d\n", dev->dd_lu_dev.ld_obd->obd_name,
+                              PFID(lu_object_fid(&slv_idx->do_lu)), rc);
+                       lu_object_put(env, &slv_idx->do_lu);
+                       slv_idx = ERR_PTR(rc);
+               }
+       }
+
        RETURN(slv_idx);
 }
 
index 830e9cf..787ca84 100644 (file)
@@ -41,7 +41,6 @@
 static int hash_lqs_cur_bits = HASH_LQE_CUR_BITS;
 CFS_MODULE_PARM(hash_lqs_cur_bits, "i", int, 0444,
                 "the current bits of lqe hash");
-cfs_mem_cache_t *lqe_kmem;
 
 static unsigned lqe64_hash_hash(cfs_hash_t *hs, const void *key, unsigned mask)
 {
index b75e5ce..7fedd29 100644 (file)
@@ -32,6 +32,7 @@
 #define _LQUOTA_INTERNAL_H
 
 #define QTYPE_NAME(qtype) ((qtype) == USRQUOTA ? "usr" : "grp")
+#define RES_NAME(res) ((res) == LQUOTA_RES_MD ? "md" : "dt")
 
 #define QIF_IFLAGS (QIF_INODES | QIF_ITIME | QIF_ILIMITS)
 #define QIF_BFLAGS (QIF_SPACE | QIF_BTIME | QIF_BLIMITS)
@@ -254,6 +255,14 @@ static inline void lqe_read_unlock(struct lquota_entry *lqe)
                cfs_read_unlock(&lqe->lqe_lock);
 }
 
+/*
+ * Helper functions & prototypes
+ */
+
+/* minimum qunit size, 1K inode for metadata pool and 1MB for data pool */
+#define LQUOTA_LEAST_QUNIT(type) \
+       (type == LQUOTA_RES_MD ? (1 << 10) : toqb(PTLRPC_MAX_BRW_SIZE))
+
 /* Common data shared by quota-level handlers. This is allocated per-thread to
  * reduce stack consumption */
 struct lquota_thread_info {
@@ -335,8 +344,9 @@ void lquota_lqe_debug0(struct lquota_entry *lqe,
 struct dt_object *acct_obj_lookup(const struct lu_env *, struct dt_device *,
                                  int);
 void lquota_generate_fid(struct lu_fid *, int, int, int);
-int lquota_extract_fid(struct lu_fid *, int *, int *, int *);
+int lquota_extract_fid(const struct lu_fid *, int *, int *, int *);
 const struct dt_index_features *glb_idx_feature(struct lu_fid *);
+extern cfs_mem_cache_t *lqe_kmem;
 
 /* lquota_entry.c */
 /* site create/destroy */
index 67145e3..bc0716e 100644 (file)
 
 #include "lquota_internal.h"
 
+cfs_mem_cache_t *lqe_kmem;
+
+struct lu_kmem_descr lquota_caches[] = {
+       {
+               .ckd_cache = &lqe_kmem,
+               .ckd_name  = "lqe_kmem",
+               .ckd_size  = sizeof(struct lquota_entry)
+       },
+       {
+               .ckd_cache = NULL
+       }
+};
+
 /* register lquota key */
 LU_KEY_INIT_FINI(lquota, struct lquota_thread_info);
 LU_CONTEXT_KEY_DEFINE(lquota, LCT_MD_THREAD | LCT_DT_THREAD | LCT_LOCAL);
@@ -244,7 +257,7 @@ void lquota_generate_fid(struct lu_fid *fid, int pool_id, int pool_type,
  * Helper routine used to extract pool ID, pool type and quota type from a
  * given FID.
  */
-int lquota_extract_fid(struct lu_fid *fid, int *pool_id, int *pool_type,
+int lquota_extract_fid(const struct lu_fid *fid, int *pool_id, int *pool_type,
                       int *quota_type)
 {
        unsigned int     tmp;
@@ -338,10 +351,14 @@ static int __init init_lquota(void)
 #warning "remove old quota compatibility code"
 #endif
 
-       rc = qmt_glb_init();
+       rc = lu_kmem_init(lquota_caches);
        if (rc)
                GOTO(out_key, rc);
 
+       rc = qmt_glb_init();
+       if (rc)
+               GOTO(out_caches, rc);
+
        rc = qsd_glb_init();
        if (rc)
                GOTO(out_qmt, rc);
@@ -350,6 +367,8 @@ static int __init init_lquota(void)
 
 out_qmt:
        qmt_glb_fini();
+out_caches:
+       lu_kmem_fini(lquota_caches);
 out_key:
        lu_context_key_degister(&lquota_thread_key);
        return rc;
@@ -359,6 +378,7 @@ static void exit_lquota(void)
 {
        qsd_glb_fini();
        qmt_glb_fini();
+       lu_kmem_fini(lquota_caches);
        lu_context_key_degister(&lquota_thread_key);
 }
 
index e165ae7..701cc16 100644 (file)
@@ -99,12 +99,29 @@ static struct lu_device *qmt_device_fini(const struct lu_env *env,
        LASSERT(qmt != NULL);
 
        CDEBUG(D_QUOTA, "%s: initiating QMT shutdown\n", qmt->qmt_svname);
+       qmt->qmt_stopping = true;
+
+       /* remove qmt proc entry */
+       if (qmt->qmt_proc != NULL && !IS_ERR(qmt->qmt_proc)) {
+               lprocfs_remove(&qmt->qmt_proc);
+               qmt->qmt_proc = NULL;
+       }
+
+       /* kill pool instances, if any */
+       qmt_pool_fini(env, qmt);
 
        /* disconnect from OSD */
        if (qmt->qmt_child_exp != NULL) {
                obd_disconnect(qmt->qmt_child_exp);
                qmt->qmt_child_exp = NULL;
-               qmt->qmt_child     = NULL;
+               qmt->qmt_child = NULL;
+       }
+
+       /* release reference on MDT namespace */
+       if (ld->ld_obd->obd_namespace != NULL) {
+               ldlm_namespace_put(ld->ld_obd->obd_namespace);
+               ld->ld_obd->obd_namespace = NULL;
+               qmt->qmt_ns = NULL;
        }
 
        RETURN(NULL);
@@ -159,7 +176,7 @@ static int qmt_connect_to_osd(const struct lu_env *env, struct qmt_device *qmt,
        /* initialize site (although it isn't used anywhere) and lu_device
         * pointer to next device */
        qmt->qmt_child = lu2dt_dev(qmt->qmt_child_exp->exp_obd->obd_lu_dev);
-       ld->ld_site    = qmt->qmt_child_exp->exp_obd->obd_lu_dev->ld_site;
+       ld->ld_site = qmt->qmt_child_exp->exp_obd->obd_lu_dev->ld_site;
        EXIT;
 out:
        if (data)
@@ -187,7 +204,8 @@ static int qmt_device_init0(const struct lu_env *env, struct qmt_device *qmt,
                            struct lu_device_type *ldt, struct lustre_cfg *cfg)
 {
        struct lu_device        *ld = qmt2lu_dev(qmt);
-       struct obd_device       *obd;
+       struct obd_device       *obd, *mdt_obd;
+       struct obd_type         *type;
        int                      rc;
        ENTRY;
 
@@ -204,11 +222,42 @@ static int qmt_device_init0(const struct lu_env *env, struct qmt_device *qmt,
        obd->obd_lu_dev = ld;
        ld->ld_obd      = obd;
 
+       /* look-up the parent MDT to steal its ldlm namespace ... */
+       mdt_obd = class_name2obd(lustre_cfg_string(cfg, 2));
+       if (mdt_obd == NULL)
+               RETURN(-ENOENT);
+
+       /* grab reference on MDT namespace. kind of a hack until we have our
+        * own namespace & service threads */
+       LASSERT(mdt_obd->obd_namespace != NULL);
+       ldlm_namespace_get(mdt_obd->obd_namespace);
+       obd->obd_namespace = mdt_obd->obd_namespace;
+       qmt->qmt_ns = obd->obd_namespace;
+
        /* connect to backend osd device */
        rc = qmt_connect_to_osd(env, qmt, cfg);
        if (rc)
                GOTO(out, rc);
 
+       /* at the moment there is no linkage between lu_type and obd_type, so
+        * we lookup obd_type this way */
+       type = class_search_type(LUSTRE_QMT_NAME);
+       LASSERT(type != NULL);
+
+       /* register proc directory associated with this qmt */
+       qmt->qmt_proc = lprocfs_register(qmt->qmt_svname, type->typ_procroot,
+                                        NULL, NULL);
+       if (IS_ERR(qmt->qmt_proc)) {
+               rc = PTR_ERR(qmt->qmt_proc);
+               CERROR("%s: failed to create qmt proc entry (%d)\n",
+                      qmt->qmt_svname, rc);
+               GOTO(out, rc);
+       }
+
+       /* initialize pool configuration */
+       rc = qmt_pool_init(env, qmt);
+       if (rc)
+               GOTO(out, rc);
        EXIT;
 out:
        if (rc)
@@ -372,7 +421,27 @@ static int qmt_device_prepare(const struct lu_env *env,
                              struct lu_device *parent,
                              struct lu_device *ld)
 {
-       return 0;
+       struct qmt_device       *qmt = lu2qmt_dev(ld);
+       struct dt_object        *qmt_root;
+       int                      rc;
+       ENTRY;
+
+       /* initialize quota master root directory where all index files will be
+        * stored */
+       qmt_root = lquota_disk_dir_find_create(env, qmt->qmt_child, NULL,
+                                              QMT_DIR);
+       if (IS_ERR(qmt_root)) {
+               rc = PTR_ERR(qmt_root);
+               CERROR("%s: failed to create master quota directory (%d)\n",
+                      qmt->qmt_svname, rc);
+               RETURN(rc);
+       }
+
+       /* initialize on-disk indexes associated with each pool */
+       rc = qmt_pool_prepare(env, qmt, qmt_root);
+
+       lu_object_put(env, &qmt_root->do_lu);
+       RETURN(rc);
 }
 
 /*
diff --git a/lustre/quota/qmt_entry.c b/lustre/quota/qmt_entry.c
new file mode 100644 (file)
index 0000000..ee57900
--- /dev/null
@@ -0,0 +1,414 @@
+/*
+ * GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 021110-1307, USA
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright (c) 2012 Intel, Inc.
+ * Use is subject to license terms.
+ *
+ * Author: Johann Lombardi <johann.lombardi@intel.com>
+ * Author: Niu    Yawei    <yawei.niu@intel.com>
+ */
+
+#ifndef EXPORT_SYMTAB
+# define EXPORT_SYMTAB
+#endif
+
+#define DEBUG_SUBSYSTEM S_LQUOTA
+
+#include "qmt_internal.h"
+
+/*
+ * Initialize qmt-specific fields of quota entry.
+ *
+ * \param lqe - is the quota entry to initialize
+ * \param arg - is the pointer to the qmt_pool_info structure
+ */
+static void qmt_lqe_init(struct lquota_entry *lqe, void *arg)
+{
+       LASSERT(lqe_is_master(lqe));
+
+       lqe->lqe_revoke_time = 0;
+       cfs_init_rwsem(&lqe->lqe_sem);
+}
+
+/*
+ * Update a lquota entry. This is done by reading quota settings from the global
+ * index. The lquota entry must be write locked.
+ *
+ * \param env - the environment passed by the caller
+ * \param lqe - is the quota entry to refresh
+ * \param arg - is the pointer to the qmt_pool_info structure
+ */
+static int qmt_lqe_read(const struct lu_env *env, struct lquota_entry *lqe,
+                       void *arg)
+{
+       struct qmt_thread_info  *qti = qmt_info(env);
+       struct qmt_pool_info    *pool = (struct qmt_pool_info *)arg;
+       int                      rc;
+       ENTRY;
+
+       LASSERT(lqe_is_master(lqe));
+
+       /* read record from disk */
+       rc = lquota_disk_read(env, pool->qpi_glb_obj[lqe->lqe_site->lqs_qtype],
+                             &lqe->lqe_id, (struct dt_rec *)&qti->qti_glb_rec);
+
+       switch (rc) {
+       case -ENOENT:
+               /* no such entry, assume quota isn't enforced for this user */
+               lqe->lqe_enforced = false;
+               break;
+       case 0:
+               /* copy quota settings from on-disk record */
+               lqe->lqe_granted   = qti->qti_glb_rec.qbr_granted;
+               lqe->lqe_hardlimit = qti->qti_glb_rec.qbr_hardlimit;
+               lqe->lqe_softlimit = qti->qti_glb_rec.qbr_softlimit;
+               lqe->lqe_gracetime = qti->qti_glb_rec.qbr_time;
+
+               if (lqe->lqe_hardlimit == 0 && lqe->lqe_softlimit == 0)
+                       /* {hard,soft}limit=0 means no quota enforced */
+                       lqe->lqe_enforced = false;
+               else
+                       lqe->lqe_enforced  = true;
+
+               break;
+       default:
+               LQUOTA_ERROR(lqe, "failed to read quota entry from disk, rc:%d",
+                            rc);
+               RETURN(rc);
+       }
+
+       LQUOTA_DEBUG(lqe, "read");
+       RETURN(0);
+}
+
+/*
+ * Print lqe information for debugging.
+ *
+ * \param lqe - is the quota entry to debug
+ * \param arg - is the pointer to the qmt_pool_info structure
+ * \param msgdata - debug message
+ * \param fmt     - format of debug message
+ */
+static void qmt_lqe_debug(struct lquota_entry *lqe, void *arg,
+                         struct libcfs_debug_msg_data *msgdata,
+                         const char *fmt, va_list args)
+{
+       struct qmt_pool_info    *pool = (struct qmt_pool_info *)arg;
+
+       libcfs_debug_vmsg2(msgdata, fmt, args,
+                          "qmt:%s pool:%d-%s id:"LPU64" enforced:%d hard:"LPU64
+                          " soft:"LPU64" granted:"LPU64" time:"LPU64" qunit:"
+                          LPU64" edquot:%d revoke:"LPU64"\n",
+                          pool->qpi_qmt->qmt_svname,
+                          pool->qpi_key & 0x0000ffff,
+                          RES_NAME(pool->qpi_key >> 16),
+                          lqe->lqe_id.qid_uid, lqe->lqe_enforced,
+                          lqe->lqe_hardlimit, lqe->lqe_softlimit,
+                          lqe->lqe_granted, lqe->lqe_gracetime,
+                          lqe->lqe_qunit, lqe->lqe_edquot,
+                          lqe->lqe_revoke_time);
+}
+
+/*
+ * Vector of quota entry operations supported on the master
+ */
+struct lquota_entry_operations qmt_lqe_ops = {
+       .lqe_init       = qmt_lqe_init,
+       .lqe_read       = qmt_lqe_read,
+       .lqe_debug      = qmt_lqe_debug,
+};
+
+/*
+ * Reserve enough credits to update records in both the global index and
+ * the slave index identified by \slv_obj
+ *
+ * \param env     - is the environment passed by the caller
+ * \param lqe     - is the quota entry associated with the identifier
+ *                  subject to the change
+ * \param slv_obj - is the dt_object associated with the index file
+ * \param restore - is a temporary storage for current quota settings which will
+ *                  be restored if something goes wrong at index update time.
+ */
+struct thandle *qmt_trans_start_with_slv(const struct lu_env *env,
+                                        struct lquota_entry *lqe,
+                                        struct dt_object *slv_obj,
+                                        struct qmt_lqe_restore *restore)
+{
+       struct qmt_device       *qmt;
+       struct thandle          *th;
+       int                      rc;
+       ENTRY;
+
+       LASSERT(lqe != NULL);
+       LASSERT(lqe_is_master(lqe));
+
+       qmt = lqe2qpi(lqe)->qpi_qmt;
+
+       if (slv_obj != NULL)
+               LQUOTA_DEBUG(lqe, "declare write for slv "DFID,
+                            PFID(lu_object_fid(&slv_obj->do_lu)));
+
+       /* start transaction */
+       th = dt_trans_create(env, qmt->qmt_child);
+       if (IS_ERR(th))
+               RETURN(th);
+
+       if (slv_obj == NULL)
+               /* quota settings on master are updated synchronously for the
+                * time being */
+               th->th_sync = 1;
+
+       /* reserve credits for global index update */
+       rc = lquota_disk_declare_write(env, th, LQE_GLB_OBJ(lqe), &lqe->lqe_id);
+       if (rc)
+               GOTO(out, rc);
+
+       if (slv_obj != NULL) {
+               /* reserve credits for slave index update */
+               rc = lquota_disk_declare_write(env, th, slv_obj, &lqe->lqe_id);
+               if (rc)
+                       GOTO(out, rc);
+       }
+
+       /* start transaction */
+       rc = dt_trans_start_local(env, qmt->qmt_child, th);
+       if (rc)
+               GOTO(out, rc);
+
+       EXIT;
+out:
+       if (rc) {
+               dt_trans_stop(env, qmt->qmt_child, th);
+               th = ERR_PTR(rc);
+               LQUOTA_ERROR(lqe, "failed to slv declare write for "DFID
+                            ", rc:%d", PFID(lu_object_fid(&slv_obj->do_lu)),
+                            rc);
+       } else {
+               restore->qlr_hardlimit = lqe->lqe_hardlimit;
+               restore->qlr_softlimit = lqe->lqe_softlimit;
+               restore->qlr_gracetime = lqe->lqe_gracetime;
+               restore->qlr_granted   = lqe->lqe_granted;
+               restore->qlr_qunit     = lqe->lqe_qunit;
+       }
+       return th;
+}
+
+/*
+ * Reserve enough credits to update a record in the global index
+ *
+ * \param env     - is the environment passed by the caller
+ * \param lqe     - is the quota entry to be modified in the global index
+ * \param restore - is a temporary storage for current quota settings which will
+ *                  be restored if something goes wrong at index update time.
+ */
+struct thandle *qmt_trans_start(const struct lu_env *env,
+                               struct lquota_entry *lqe,
+                               struct qmt_lqe_restore *restore)
+{
+       LQUOTA_DEBUG(lqe, "declare write");
+       return qmt_trans_start_with_slv(env, lqe, NULL, restore);
+}
+
+/*
+ * Update record associated with a quota entry in the global index.
+ * If LQUOTA_BUMP_VER is set, then the global index version must also be
+ * bumped.
+ * The entry must be at least read locked, dirty and up-to-date.
+ *
+ * \param env   - the environment passed by the caller
+ * \param th    - is the transaction handle to be used for the disk writes
+ * \param lqe   - is the quota entry to udpate
+ * \param obj   - is the dt_object associated with the index file
+ * \param flags - can be LQUOTA_BUMP_VER or LQUOTA_SET_VER.
+ * \param ver   - is used to return the new version of the index.
+ *
+ * \retval      - 0 on success and lqe dirty flag cleared,
+ *                appropriate error on failure and uptodate flag cleared.
+ */
+int qmt_glb_write(const struct lu_env *env, struct thandle *th,
+                 struct lquota_entry *lqe, __u32 flags, __u64 *ver)
+{
+       struct qmt_thread_info  *qti = qmt_info(env);
+       struct lquota_glb_rec   *rec;
+       int                      rc;
+       ENTRY;
+
+       LASSERT(lqe != NULL);
+       LASSERT(lqe_is_master(lqe));
+       LASSERT(lqe_is_locked(lqe));
+       LASSERT(lqe->lqe_uptodate);
+       LASSERT((flags & ~(LQUOTA_BUMP_VER | LQUOTA_SET_VER)) == 0);
+
+       LQUOTA_DEBUG(lqe, "write glb");
+
+       if (!lqe->lqe_enforced && lqe->lqe_granted == 0 &&
+           lqe->lqe_id.qid_uid != 0) {
+               /* quota isn't enforced any more for this entry and there is no
+                * more space granted to slaves, let's just remove the entry
+                * from the index */
+               rec = NULL;
+       } else {
+               rec = &qti->qti_glb_rec;
+
+               /* fill global index with updated quota settings */
+               rec->qbr_granted   = lqe->lqe_granted;
+               rec->qbr_hardlimit = lqe->lqe_hardlimit;
+               rec->qbr_softlimit = lqe->lqe_softlimit;
+               rec->qbr_time      = lqe->lqe_gracetime;
+       }
+
+       /* write new quota settings */
+       rc = lquota_disk_write(env, th, LQE_GLB_OBJ(lqe), &lqe->lqe_id,
+                              (struct dt_rec *)rec, flags, ver);
+       if (rc)
+               /* we failed to write the new quota settings to disk, report
+                * error to caller who will restore the initial value */
+               LQUOTA_ERROR(lqe, "failed to update global index, rc:%d", rc);
+
+       RETURN(rc);
+}
+
+/*
+ * Read from disk how much quota space is allocated to a slave.
+ * This is done by reading records from the dedicated slave index file.
+ * Return in \granted how much quota space is currently allocated to the
+ * slave.
+ * The entry must be at least read locked.
+ *
+ * \param env - the environment passed by the caller
+ * \param lqe - is the quota entry associated with the identifier to look-up
+ *              in the slave index
+ * \param slv_obj - is the dt_object associated with the slave index
+ * \param granted - is the output parameter where to return how much space
+ *                  is granted to the slave.
+ *
+ * \retval    - 0 on success, appropriate error on failure
+ */
+int qmt_slv_read(const struct lu_env *env, struct lquota_entry *lqe,
+                struct dt_object *slv_obj, __u64 *granted)
+{
+       struct qmt_thread_info  *qti = qmt_info(env);
+       struct lquota_slv_rec   *slv_rec = &qti->qti_slv_rec;
+       int                      rc;
+       ENTRY;
+
+       LASSERT(lqe != NULL);
+       LASSERT(lqe_is_master(lqe));
+       LASSERT(lqe_is_locked(lqe));
+
+       LQUOTA_DEBUG(lqe, "read slv "DFID,
+                    PFID(lu_object_fid(&slv_obj->do_lu)));
+
+       /* read slave record from disk */
+       rc = lquota_disk_read(env, slv_obj, &lqe->lqe_id,
+                             (struct dt_rec *)slv_rec);
+       switch (rc) {
+       case -ENOENT:
+               *granted = 0;
+               break;
+       case 0:
+               /* extract granted from on-disk record */
+               *granted = slv_rec->qsr_granted;
+               break;
+       default:
+               LQUOTA_ERROR(lqe, "failed to read slave record "DFID,
+                            PFID(lu_object_fid(&slv_obj->do_lu)));
+               RETURN(rc);
+       }
+
+       LQUOTA_DEBUG(lqe, "successful slv read "LPU64, *granted);
+
+       RETURN(0);
+}
+
+/*
+ * Update record in slave index file.
+ * The entry must be at least read locked.
+ *
+ * \param env - the environment passed by the caller
+ * \param th  - is the transaction handle to be used for the disk writes
+ * \param lqe - is the dirty quota entry which will be updated at the same time
+ *              as the slave index
+ * \param slv_obj - is the dt_object associated with the slave index
+ * \param flags - can be LQUOTA_BUMP_VER or LQUOTA_SET_VER.
+ * \param ver   - is used to return the new version of the index.
+ * \param granted - is the new amount of quota space owned by the slave
+ *
+ * \retval    - 0 on success, appropriate error on failure
+ */
+int qmt_slv_write(const struct lu_env *env, struct thandle *th,
+                 struct lquota_entry *lqe, struct dt_object *slv_obj,
+                 __u32 flags, __u64 *ver, __u64 granted)
+{
+       struct qmt_thread_info  *qti = qmt_info(env);
+       struct lquota_slv_rec   *rec;
+       int                      rc;
+       ENTRY;
+
+       LASSERT(lqe != NULL);
+       LASSERT(lqe_is_master(lqe));
+       LASSERT(lqe_is_locked(lqe));
+
+       LQUOTA_DEBUG(lqe, "write slv "DFID" granted:"LPU64,
+                    PFID(lu_object_fid(&slv_obj->do_lu)), granted);
+
+       if (granted == 0) {
+               /* this slave does not own any quota space for this ID any more,
+                * so let's just remove the entry from the index */
+               rec = NULL;
+       } else {
+               rec = &qti->qti_slv_rec;
+
+               /* updated space granted to this slave */
+               rec->qsr_granted = granted;
+       }
+
+       /* write new granted space */
+       rc = lquota_disk_write(env, th, slv_obj, &lqe->lqe_id,
+                              (struct dt_rec *)rec, flags, ver);
+       if (rc) {
+               LQUOTA_ERROR(lqe, "failed to update slave index "DFID" granted:"
+                            LPU64, PFID(lu_object_fid(&slv_obj->do_lu)),
+                            granted);
+               RETURN(rc);
+       }
+
+       RETURN(0);
+}
+
+/*
+ * Check whether new limits are valid for this pool
+ *
+ * \param lqe  - is the quota entry subject to the setquota
+ * \param hard - is the new hard limit
+ * \param soft - is the new soft limit
+ */
+int qmt_validate_limits(struct lquota_entry *lqe, __u64 hard, __u64 soft)
+{
+       ENTRY;
+
+       if (hard != 0 && soft > hard)
+               /* soft limit must be less than hard limit */
+               RETURN(-EINVAL);
+       RETURN(0);
+}
index c077867..19e706f 100644 (file)
 #include "qmt_internal.h"
 
 /*
+ * Fetch grace time for either inode or block.
+ *
+ * \param env     - is the environment passed by the caller
+ * \param qmt     - is the quota master target
+ * \param pool_id - is the 16-bit pool identifier
+ * \param restype - is the pool type, either block (i.e. LQUOTA_RES_DT) or inode
+ *                  (i.e. LQUOTA_RES_MD)
+ * \param qtype   - is the quota type
+ * \param time    - is the output variable where to copy the grace time
+ */
+static int qmt_getinfo(const struct lu_env *env, struct qmt_device *qmt,
+                      __u16 pool_id, __u8 restype, __u8 qtype, __u64 *time)
+{
+       struct qmt_thread_info  *qti = qmt_info(env);
+       union lquota_id         *id  = &qti->qti_id_bis;
+       struct lquota_entry     *lqe;
+       ENTRY;
+
+       /* Global grace time is stored in quota settings of ID 0. */
+       id->qid_uid = 0;
+
+       /* look-up quota entry storing grace time */
+       lqe = qmt_pool_lqe_lookup(env, qmt, pool_id, restype, qtype, id);
+       if (IS_ERR(lqe))
+               RETURN(PTR_ERR(lqe));
+
+       lqe_read_lock(lqe);
+       LQUOTA_DEBUG(lqe, "getinfo");
+       /* copy grace time */
+       *time = lqe->lqe_gracetime;
+       lqe_read_unlock(lqe);
+
+       lqe_putref(lqe);
+       RETURN(0);
+}
+
+/*
+ * Update grace time for either inode or block.
+ * Global grace time is stored in quota settings of ID 0.
+ *
+ * \param env     - is the environment passed by the caller
+ * \param qmt     - is the quota master target
+ * \param pool_id - is the 16-bit pool identifier
+ * \param restype - is the pool type, either block (i.e. LQUOTA_RES_DT) or inode
+ *                  (i.e. LQUOTA_RES_MD)
+ * \param qtype   - is the quota type
+ * \param time    - is the new grace time
+ */
+static int qmt_setinfo(const struct lu_env *env, struct qmt_device *qmt,
+                      __u16 pool_id, __u8 restype, __u8 qtype, __u64 time)
+{
+       struct qmt_thread_info  *qti = qmt_info(env);
+       union lquota_id         *id  = &qti->qti_id_bis;
+       struct lquota_entry     *lqe;
+       struct thandle          *th = NULL;
+       int                      rc;
+       ENTRY;
+
+       /* Global grace time is stored in quota settings of ID 0. */
+       id->qid_uid = 0;
+
+       /* look-up quota entry storing the global grace time */
+       lqe = qmt_pool_lqe_lookup(env, qmt, pool_id, restype, qtype, id);
+       if (IS_ERR(lqe))
+               RETURN(PTR_ERR(lqe));
+
+       /* allocate & start transaction with enough credits to update grace
+        * time in the global index file */
+       th = qmt_trans_start(env, lqe, &qti->qti_restore);
+       if (IS_ERR(th))
+               GOTO(out_nolock, rc = PTR_ERR(th));
+
+       /* write lock quota entry storing the grace time */
+       lqe_write_lock(lqe);
+       if (lqe->lqe_gracetime == time)
+               /* grace time is the same */
+               GOTO(out, rc = 0);
+
+       LQUOTA_DEBUG(lqe, "setinfo time:"LPU64, time);
+
+       /* set new grace time */
+       lqe->lqe_gracetime = time;
+       /* always set enforced bit for ID 0 to make sure it does not go away */
+       lqe->lqe_enforced  = true;
+
+       /* write new grace time to disk, no need for version bump */
+       rc = qmt_glb_write(env, th, lqe, 0, NULL);
+       if (rc) {
+               /* restore initial grace time */
+               qmt_restore(lqe, &qti->qti_restore);
+               GOTO(out, rc);
+       }
+       EXIT;
+out:
+       lqe_write_unlock(lqe);
+out_nolock:
+       lqe_putref(lqe);
+       if (th != NULL && !IS_ERR(th))
+               dt_trans_stop(env, qmt->qmt_child, th);
+       return rc;
+}
+
+/*
+ * Retrieve quota settings for a given identifier.
+ *
+ * \param env     - is the environment passed by the caller
+ * \param qmt     - is the quota master target
+ * \param pool_id - is the 16-bit pool identifier
+ * \param restype - is the pool type, either block (i.e. LQUOTA_RES_DT) or inode
+ *                  (i.e. LQUOTA_RES_MD)
+ * \param qtype   - is the quota type
+ * \param id      - is the quota indentifier for which we want to acces quota
+ *                  settings.
+ * \param hard    - is the output variable where to copy the hard limit
+ * \param soft    - is the output variable where to copy the soft limit
+ * \param time    - is the output variable where to copy the grace time
+ */
+static int qmt_getquota(const struct lu_env *env, struct qmt_device *qmt,
+                       __u16 pool_id, __u8 restype, __u8 qtype,
+                       union lquota_id *id, __u64 *hard, __u64 *soft,
+                       __u64 *time)
+{
+       struct lquota_entry     *lqe;
+       ENTRY;
+
+       /* look-up lqe structure containing quota settings */
+       lqe = qmt_pool_lqe_lookup(env, qmt, pool_id, restype, qtype, id);
+       if (IS_ERR(lqe))
+               RETURN(PTR_ERR(lqe));
+
+       /* copy quota settings */
+       lqe_read_lock(lqe);
+       LQUOTA_DEBUG(lqe, "getquota");
+       *hard = lqe->lqe_hardlimit;
+       *soft = lqe->lqe_softlimit;
+       *time = lqe->lqe_gracetime;
+       lqe_read_unlock(lqe);
+
+       lqe_putref(lqe);
+       RETURN(0);
+}
+
+/*
+ * Update quota settings for a given identifier.
+ *
+ * \param env     - is the environment passed by the caller
+ * \param qmt     - is the quota master target
+ * \param pool_id - is the 16-bit pool identifier
+ * \param restype - is the pool type, either block (i.e. LQUOTA_RES_DT) or inode
+ *                  (i.e. LQUOTA_RES_MD)
+ * \param qtype   - is the quota type
+ * \param id      - is the quota indentifier for which we want to modify quota
+ *                  settings.
+ * \param hard    - is the new hard limit
+ * \param soft    - is the new soft limit
+ * \param time    - is the new grace time
+ * \param valid   - is the list of settings to change
+ */
+static int qmt_setquota(const struct lu_env *env, struct qmt_device *qmt,
+                       __u16 pool_id, __u8 restype, __u8 qtype,
+                       union lquota_id *id, __u64 hard, __u64 soft, __u64 time,
+                       __u32 valid)
+{
+       struct qmt_thread_info  *qti = qmt_info(env);
+       struct lquota_entry     *lqe;
+       struct thandle          *th = NULL;
+       __u64                    grace, ver;
+       bool                     dirtied = false, bump_version = false;
+       int                      rc = 0;
+       ENTRY;
+
+       /* fetch global grace time */
+       rc = qmt_getinfo(env, qmt, pool_id, restype, qtype, &grace);
+       if (rc)
+               RETURN(rc);
+
+       /* look-up quota entry associated with this ID */
+       lqe = qmt_pool_lqe_lookup(env, qmt, pool_id, restype, qtype, id);
+       if (IS_ERR(lqe))
+               RETURN(PTR_ERR(lqe));
+
+       /* allocate & start transaction with enough credits to update quota
+        * settings in the global index file */
+       th = qmt_trans_start(env, lqe, &qti->qti_restore);
+       if (IS_ERR(th))
+               GOTO(out_nolock, rc = PTR_ERR(th));
+
+       lqe_write_lock(lqe);
+       LQUOTA_DEBUG(lqe, "setquota valid:%x hard:"LPU64" soft:"LPU64
+                    " time:"LPU64, valid, hard, soft, time);
+
+       if ((valid & QIF_TIMES) != 0 && lqe->lqe_gracetime != time) {
+               /* change time settings */
+               lqe->lqe_gracetime = time;
+               dirtied            = true;
+       }
+
+       if ((valid & QIF_LIMITS) != 0 &&
+           (lqe->lqe_hardlimit != hard || lqe->lqe_softlimit != soft)) {
+               bool enforced = lqe->lqe_enforced;
+
+               rc = qmt_validate_limits(lqe, hard, soft);
+               if (rc)
+                       GOTO(out, rc);
+
+               /* change quota limits */
+               lqe->lqe_hardlimit = hard;
+               lqe->lqe_softlimit = soft;
+
+               /* clear grace time */
+               if (lqe->lqe_softlimit == 0 ||
+                   lqe->lqe_granted <= lqe->lqe_softlimit)
+                       /* no soft limit or below soft limit, let's clear grace
+                        * time */
+                       lqe->lqe_gracetime = 0;
+               else if ((valid & QIF_TIMES) == 0)
+                       /* set grace only if user hasn't provided his own */
+                        lqe->lqe_gracetime = cfs_time_current_sec() + grace;
+
+               /* change enforced status based on new parameters */
+               if (lqe->lqe_hardlimit == 0 && lqe->lqe_softlimit == 0)
+                       lqe->lqe_enforced = false;
+               else
+                       lqe->lqe_enforced = true;
+
+               if ((enforced && !lqe->lqe_enforced) ||
+                   (!enforced && lqe->lqe_enforced))
+                       /* if enforced status has changed, we need to inform
+                        * slave, therefore we need to bump the version */
+                        bump_version = true;
+
+               dirtied = true;
+       }
+
+       if (dirtied) {
+               /* write new quota settings to disk */
+               rc = qmt_glb_write(env, th, lqe,
+                                  bump_version ? LQUOTA_BUMP_VER : 0, &ver);
+               if (rc) {
+                       /* restore initial quota settings */
+                       qmt_restore(lqe, &qti->qti_restore);
+                       GOTO(out, rc);
+               }
+       }
+       EXIT;
+out:
+       lqe_write_unlock(lqe);
+out_nolock:
+       lqe_putref(lqe);
+
+       if (th != NULL && !IS_ERR(th))
+               dt_trans_stop(env, qmt->qmt_child, th);
+
+       return rc;
+}
+
+/*
  * Handle quotactl request.
  *
  * \param env   - is the environment passed by the caller
 static int qmt_quotactl(const struct lu_env *env, struct lu_device *ld,
                        struct obd_quotactl *oqctl)
 {
+       struct qmt_thread_info  *qti = qmt_info(env);
+       union lquota_id         *id  = &qti->qti_id;
        struct qmt_device       *qmt = lu2qmt_dev(ld);
+       struct obd_dqblk        *dqb = &oqctl->qc_dqblk;
        int                      rc = 0;
        ENTRY;
 
@@ -59,25 +319,109 @@ static int qmt_quotactl(const struct lu_env *env, struct lu_device *ld,
 
        switch (oqctl->qc_cmd) {
 
-       case Q_GETINFO:
-       case Q_SETINFO:
-       case Q_SETQUOTA:
-               /* XXX: not implemented yet. */
-               CERROR("quotactl operation %d not implemented yet\n",
-                      oqctl->qc_cmd);
-               RETURN(-EOPNOTSUPP);
-
-       case Q_GETQUOTA:
-               /* XXX: return no limit for now, just for testing purpose */
-               memset(&oqctl->qc_dqblk, 0, sizeof(struct obd_dqblk));
-               oqctl->qc_dqblk.dqb_valid = QIF_LIMITS;
-               rc = 0;
+       case Q_GETINFO:  /* read grace times */
+               /* read inode grace time */
+               rc = qmt_getinfo(env, qmt, 0, LQUOTA_RES_MD, oqctl->qc_type,
+                                &oqctl->qc_dqinfo.dqi_igrace);
+               if (rc)
+                       break;
+
+               /* read block grace time */
+               rc = qmt_getinfo(env, qmt, 0, LQUOTA_RES_DT, oqctl->qc_type,
+                                &oqctl->qc_dqinfo.dqi_bgrace);
+               break;
+
+       case Q_SETINFO:  /* modify grace times */
+               /* setinfo should be using dqi->dqi_valid, but lfs incorrectly
+                * sets the valid flags in dqb->dqb_valid instead, try to live
+                * with that ... */
+               if ((dqb->dqb_valid & QIF_ITIME) != 0) {
+                       /* set inode grace time */
+                       rc = qmt_setinfo(env, qmt, 0, LQUOTA_RES_MD,
+                                        oqctl->qc_type,
+                                        oqctl->qc_dqinfo.dqi_igrace);
+                       if (rc)
+                               break;
+               }
+
+               if ((dqb->dqb_valid & QIF_BTIME) != 0)
+                       /* set block grace time */
+                       rc = qmt_setinfo(env, qmt, 0, LQUOTA_RES_DT,
+                                        oqctl->qc_type,
+                                        oqctl->qc_dqinfo.dqi_bgrace);
+               break;
+
+       case Q_GETQUOTA: /* consult quota limit */
+               /* There is no quota limit for root user & group */
+               if (oqctl->qc_id == 0) {
+                       memset(dqb, 0, sizeof(*dqb));
+                       dqb->dqb_valid = QIF_LIMITS | QIF_TIMES;
+                       break;
+               }
+               /* extract quota ID from quotactl request */
+               id->qid_uid = oqctl->qc_id;
+
+               /* look-up inode quota settings */
+               rc = qmt_getquota(env, qmt, 0, LQUOTA_RES_MD, oqctl->qc_type,
+                                 id, &dqb->dqb_ihardlimit,
+                                 &dqb->dqb_isoftlimit, &dqb->dqb_itime);
+               if (rc)
+                       break;
+
+               dqb->dqb_valid |= QIF_ILIMITS | QIF_ITIME;
+               /* master isn't aware of actual inode usage */
+               dqb->dqb_curinodes = 0;
+
+               /* look-up block quota settings */
+               rc = qmt_getquota(env, qmt, 0, LQUOTA_RES_DT, oqctl->qc_type,
+                                 id, &dqb->dqb_bhardlimit,
+                                 &dqb->dqb_bsoftlimit, &dqb->dqb_btime);
+               if (rc)
+                       break;
+
+               dqb->dqb_valid |= QIF_BLIMITS | QIF_BTIME;
+               /* master doesn't know the actual block usage */
+               dqb->dqb_curspace = 0;
+               break;
+
+       case Q_SETQUOTA: /* change quota limits */
+               if (oqctl->qc_id == 0)
+                       /* can't enforce a quota limit for root user & group */
+                       RETURN(-EPERM);
+               /* extract quota ID from quotactl request */
+               id->qid_uid = oqctl->qc_id;
+
+               if ((dqb->dqb_valid & QIF_IFLAGS) != 0) {
+                       /* update inode quota settings */
+                       rc = qmt_setquota(env, qmt, 0, LQUOTA_RES_MD,
+                                         oqctl->qc_type, id,
+                                         dqb->dqb_ihardlimit,
+                                         dqb->dqb_isoftlimit, dqb->dqb_itime,
+                                         dqb->dqb_valid & QIF_IFLAGS);
+                       if (rc)
+                               break;
+               }
+
+               if ((dqb->dqb_valid & QIF_BFLAGS) != 0)
+                       /* update block quota settings */
+                       rc = qmt_setquota(env, qmt, 0, LQUOTA_RES_DT,
+                                         oqctl->qc_type, id,
+                                         dqb->dqb_bhardlimit,
+                                         dqb->dqb_bsoftlimit, dqb->dqb_btime,
+                                         dqb->dqb_valid & QIF_BFLAGS);
                break;
 
+       case Q_QUOTAON:
+       case Q_QUOTAOFF:   /* quota is always turned on on the master */
+               RETURN(0);
+
+       case LUSTRE_Q_INVALIDATE: /* not supported any more */
+               RETURN(-ENOTSUPP);
+
        default:
                CERROR("%s: unsupported quotactl command: %d\n",
                       qmt->qmt_svname, oqctl->qc_cmd);
-               RETURN(-EFAULT);
+               RETURN(-ENOTSUPP);
        }
 
        RETURN(rc);
@@ -104,6 +448,8 @@ static int qmt_dqacq(const struct lu_env *env, struct lu_device *ld,
        if (repbody == NULL)
                RETURN(err_serious(-EFAULT));
 
+       /* XXX: to be implemented */
+
        RETURN(0);
 }
 
index abe782d..7802006 100644 (file)
@@ -52,12 +52,125 @@ struct qmt_device {
         * index files */
        struct obd_export       *qmt_child_exp;
        struct dt_device        *qmt_child;
+
+       /* pointer to ldlm namespace to be used for quota locks */
+       struct ldlm_namespace   *qmt_ns;
+
+       /* Hash table containing a qmt_pool_info structure for each pool
+        * this quota master is in charge of. We only have 2 pools in this
+        * hash for the time being:
+        * - one for quota management on the default metadata pool
+        * - one for quota managment on the default data pool
+        *
+        * Once we support quota on non-default pools, then more pools will
+        * be added to this hash table and pool master setup would have to be
+        * handled via configuration logs */
+       cfs_hash_t              *qmt_pool_hash;
+
+       /* List of pools managed by this master target */
+       cfs_list_t               qmt_pool_list;
+
+       /* procfs root directory for this qmt */
+       cfs_proc_dir_entry_t    *qmt_proc;
+
+       unsigned long            qmt_stopping:1; /* qmt is stopping */
+
+};
+
+/*
+ * Per-pool quota information.
+ * The qmt creates one such structure for each pool
+ * with quota enforced. All the structures are kept in a hash which is used to
+ * determine whether or not quota is enforced for a given pool.
+ * We currently only support the default data pool and default metadata pool
+ * with the pool_id 0.
+ */
+struct qmt_pool_info {
+       /* link to qmt's pool hash */
+       cfs_hlist_node_t         qpi_hash;
+
+       /* chained list of all pools managed by the same qmt */
+       cfs_list_t               qpi_linkage;
+
+       /* Pool key composed of pool_id | (pool_type << 16)
+        * Only pool ID 0 is supported for now and the pool type is either
+        * QUOTA_RES_MD or QUOTA_RES_DT.
+        * immutable after creation. */
+       __u32                    qpi_key;
+
+       /* track users of this pool instance */
+       cfs_atomic_t             qpi_ref;
+
+       /* back pointer to master target
+        * immutable after creation. */
+       struct qmt_device       *qpi_qmt;
+
+       /* pointer to dt object associated with global indexes for both user
+        * and group quota */
+       struct dt_object        *qpi_glb_obj[MAXQUOTAS];
+
+       /* A pool supports two different quota types: user and group quota.
+        * Each quota type has its own global index and lquota_entry hash table.
+        */
+       struct lquota_site      *qpi_site[MAXQUOTAS];
+
+       /* number of slaves registered for each quota types */
+       int                      qpi_slv_nr[MAXQUOTAS];
+
+       /* procfs root directory for this pool */
+       cfs_proc_dir_entry_t    *qpi_proc;
+
+       /* pool directory where all indexes related to this pool instance are
+        * stored */
+       struct dt_object        *qpi_root;
+
+       /* Global quota parameters which apply to all quota type */
+       /* the least value of qunit */
+       unsigned long            qpi_least_qunit;
+};
+
+/*
+ * Helper routines and prototypes
+ */
+
+/* helper routine to find qmt_pool_info associated a lquota_entry */
+static inline struct qmt_pool_info *lqe2qpi(struct lquota_entry *lqe)
+{
+       LASSERT(lqe_is_master(lqe));
+       return (struct qmt_pool_info *)lqe->lqe_site->lqs_parent;
+}
+
+/* return true if someone holds either a read or write lock on the lqe */
+static inline bool lqe_is_locked(struct lquota_entry *lqe)
+{
+       LASSERT(lqe_is_master(lqe));
+       if (cfs_down_write_trylock(&lqe->lqe_sem) == 0)
+               return true;
+       lqe_write_unlock(lqe);
+       return false;
+}
+
+/* value to be restored if someone wrong happens during lqe writeback */
+struct qmt_lqe_restore {
+       __u64   qlr_hardlimit;
+       __u64   qlr_softlimit;
+       __u64   qlr_gracetime;
+       __u64   qlr_granted;
+       __u64   qlr_qunit;
 };
 
 /* Common data shared by qmt handlers */
 struct qmt_thread_info {
        union lquota_rec        qti_rec;
        union lquota_id         qti_id;
+       union lquota_id         qti_id_bis;
+       char                    qti_buf[MTI_NAME_MAXLEN];
+       struct lu_fid           qti_fid;
+       struct ldlm_res_id      qti_resid;
+       union ldlm_gl_desc      qti_gl_desc;
+       struct quota_body       qti_body;
+       struct quota_body       qti_repbody;
+       struct qmt_lqe_restore  qti_restore;
 };
 
 extern struct lu_context_key qmt_thread_key;
@@ -89,6 +202,47 @@ static inline struct lu_device *qmt2lu_dev(struct qmt_device *qmt)
        return &qmt->qmt_dt_dev.dd_lu_dev;
 }
 
+#define LQE_ROOT(lqe)    (lqe2qpi(lqe)->qpi_root)
+#define LQE_GLB_OBJ(lqe) (lqe2qpi(lqe)->qpi_glb_obj[lqe->lqe_site->lqs_qtype])
+
+static inline void qmt_restore(struct lquota_entry *lqe,
+                              struct qmt_lqe_restore *restore)
+{
+       lqe->lqe_hardlimit = restore->qlr_hardlimit;
+       lqe->lqe_softlimit = restore->qlr_softlimit;
+       lqe->lqe_gracetime = restore->qlr_gracetime;
+       lqe->lqe_granted   = restore->qlr_granted;
+       lqe->lqe_qunit     = restore->qlr_qunit;
+}
+
+/* qmt_pool.c */
+void qmt_pool_fini(const struct lu_env *, struct qmt_device *);
+int qmt_pool_init(const struct lu_env *, struct qmt_device *);
+int qmt_pool_prepare(const struct lu_env *, struct qmt_device *,
+                  struct dt_object *);
+int qmt_pool_new_conn(const struct lu_env *, struct qmt_device *,
+                     struct lu_fid *, struct lu_fid *, __u64 *,
+                     struct obd_uuid *);
+struct lquota_entry *qmt_pool_lqe_lookup(const struct lu_env *,
+                                        struct qmt_device *, int, int, int,
+                                        union lquota_id *);
+/* qmt_entry.c */
+extern struct lquota_entry_operations qmt_lqe_ops;
+struct thandle *qmt_trans_start_with_slv(const struct lu_env *,
+                                        struct lquota_entry *,
+                                        struct dt_object *,
+                                        struct qmt_lqe_restore *);
+struct thandle *qmt_trans_start(const struct lu_env *, struct lquota_entry *,
+                               struct qmt_lqe_restore *);
+int qmt_glb_write(const struct lu_env *, struct thandle *,
+                 struct lquota_entry *, __u32, __u64 *);
+int qmt_slv_write(const struct lu_env *, struct thandle *,
+                 struct lquota_entry *, struct dt_object *, __u32, __u64 *,
+                 __u64);
+int qmt_slv_read(const struct lu_env *, struct lquota_entry *,
+                struct dt_object *, __u64 *);
+int qmt_validate_limits(struct lquota_entry *, __u64, __u64);
+
 /* qmt_lock.c */
 int qmt_intent_policy(const struct lu_env *, struct lu_device *,
                      struct ptlrpc_request *, struct ldlm_lock **, int);
diff --git a/lustre/quota/qmt_pool.c b/lustre/quota/qmt_pool.c
new file mode 100644 (file)
index 0000000..3bea3d9
--- /dev/null
@@ -0,0 +1,642 @@
+/*
+ * GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 021110-1307, USA
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright (c) 2012 Intel, Inc.
+ * Use is subject to license terms.
+ *
+ * Author: Johann Lombardi <johann.lombardi@intel.com>
+ * Author: Niu    Yawei    <yawei.niu@intel.com>
+ */
+
+/*
+ * A Quota Master Target has a hash table where it stores qmt_pool_info
+ * structures. There is one such structure for each pool managed by the QMT.
+ *
+ * Each pool can have different quota types enforced (typically user & group
+ * quota). A pool is in charge of managing lquota_entry structures for each
+ * quota type. This is done by creating one lquota_entry site per quota
+ * type. A site stores entries in a hash table and read quota settings from disk
+ * when a given ID isn't present in the hash.
+ *
+ * The pool API exported here is the following:
+ * - qmt_pool_init(): initializes the general QMT structures used to manage
+ *                    pools.
+ * - qmt_pool_fini(): frees the structures allocated by qmt_pool_fini().
+ * - qmt_pool_prepare(): sets up the on-disk indexes associated with each pool.
+ * - qmt_pool_new_conn(): is used to create a new slave index file.
+ * - qmt_pool_lqe_lookup(): returns an up-to-date lquota entry associated with
+ *                          a given ID.
+ */
+
+#ifndef EXPORT_SYMTAB
+# define EXPORT_SYMTAB
+#endif
+
+#define DEBUG_SUBSYSTEM S_LQUOTA
+
+#include <obd_class.h>
+#include <lprocfs_status.h>
+#include "qmt_internal.h"
+
+static void qmt_pool_free(const struct lu_env *, struct qmt_pool_info *);
+
+/*
+ * Static helper functions not used outside the scope of this file
+ */
+
+/*
+ * Reference counter management for qmt_pool_info structures
+ */
+static inline void qpi_getref(struct qmt_pool_info *pool)
+{
+       cfs_atomic_inc(&pool->qpi_ref);
+}
+
+static inline void qpi_putref(const struct lu_env *env,
+                              struct qmt_pool_info *pool)
+{
+       LASSERT(atomic_read(&pool->qpi_ref) > 0);
+       if (cfs_atomic_dec_and_test(&pool->qpi_ref))
+               qmt_pool_free(env, pool);
+}
+
+static inline void qpi_putref_locked(struct qmt_pool_info *pool)
+{
+       LASSERT(cfs_atomic_read(&pool->qpi_ref) > 1);
+       cfs_atomic_dec(&pool->qpi_ref);
+}
+
+/*
+ * Hash functions for qmt_pool_info management
+ */
+
+static unsigned qpi_hash_hash(cfs_hash_t *hs, const void *key, unsigned mask)
+{
+       return cfs_hash_u32_hash(*((__u32 *)key), mask);
+}
+
+static void *qpi_hash_key(cfs_hlist_node_t *hnode)
+{
+       struct qmt_pool_info *pool;
+       pool = cfs_hlist_entry(hnode, struct qmt_pool_info, qpi_hash);
+       return &pool->qpi_key;
+}
+
+static int qpi_hash_keycmp(const void *key, cfs_hlist_node_t *hnode)
+{
+       struct qmt_pool_info *pool;
+       pool = cfs_hlist_entry(hnode, struct qmt_pool_info, qpi_hash);
+       return pool->qpi_key == *((__u32 *)key);
+}
+
+static void *qpi_hash_object(cfs_hlist_node_t *hnode)
+{
+       return cfs_hlist_entry(hnode, struct qmt_pool_info, qpi_hash);
+}
+
+static void qpi_hash_get(cfs_hash_t *hs, cfs_hlist_node_t *hnode)
+{
+       struct qmt_pool_info *pool;
+       pool = cfs_hlist_entry(hnode, struct qmt_pool_info, qpi_hash);
+       qpi_getref(pool);
+}
+
+static void qpi_hash_put_locked(cfs_hash_t *hs, cfs_hlist_node_t *hnode)
+{
+       struct qmt_pool_info *pool;
+       pool = cfs_hlist_entry(hnode, struct qmt_pool_info, qpi_hash);
+       qpi_putref_locked(pool);
+}
+
+static void qpi_hash_exit(cfs_hash_t *hs, cfs_hlist_node_t *hnode)
+{
+       CERROR("Should not have any item left!\n");
+}
+
+/* vector of hash operations */
+static cfs_hash_ops_t qpi_hash_ops = {
+       .hs_hash        = qpi_hash_hash,
+       .hs_key         = qpi_hash_key,
+       .hs_keycmp      = qpi_hash_keycmp,
+       .hs_object      = qpi_hash_object,
+       .hs_get         = qpi_hash_get,
+       .hs_put_locked  = qpi_hash_put_locked,
+       .hs_exit        = qpi_hash_exit
+};
+
+/* some procfs helpers */
+static int lprocfs_qpi_rd_state(char *page, char **start, off_t off,
+                               int count, int *eof, void *data)
+{
+       struct qmt_pool_info    *pool = (struct qmt_pool_info *)data;
+       int                      type, i = 0;
+
+       LASSERT(pool != NULL);
+
+       i = snprintf(page, count,
+                    "pool:\n"
+                    "    id: %u\n"
+                    "    type: %s\n"
+                    "    ref: %d\n"
+                    "    least qunit: %lu\n",
+                    pool->qpi_key & 0x0000ffff,
+                    RES_NAME(pool->qpi_key >> 16),
+                    cfs_atomic_read(&pool->qpi_ref),
+                    pool->qpi_least_qunit);
+
+
+       for (type = 0; type < MAXQUOTAS; type++)
+               i += snprintf(page + i, count - i,
+                             "    %s:\n"
+                             "        #slv: %d\n"
+                             "        #lqe: %d\n",
+                             QTYPE_NAME(type),
+                             pool->qpi_slv_nr[type],
+                   cfs_atomic_read(&pool->qpi_site[type]->lqs_hash->hs_count));
+
+       return i;
+}
+
+static struct lprocfs_vars lprocfs_quota_qpi_vars[] = {
+       { "info", lprocfs_qpi_rd_state, 0, 0},
+       { NULL }
+};
+
+/*
+ * Allocate a new qmt_pool_info structure and add it to the pool hash table
+ * of the qmt.
+ *
+ * \param env       - is the environment passed by the caller
+ * \param qmt       - is the quota master target
+ * \param pool_id   - is the 16-bit pool identifier of the new pool to add
+ * \param pool_type - is the resource type of this pool instance, either
+ *                    LQUOTA_RES_MD or LQUOTA_RES_DT.
+ *
+ * \retval - 0 on success, appropriate error on failure
+ */
+static int qmt_pool_alloc(const struct lu_env *env, struct qmt_device *qmt,
+                         int pool_id, int pool_type)
+{
+       struct qmt_thread_info  *qti = qmt_info(env);
+       struct qmt_pool_info    *pool;
+       int                      rc = 0;
+       ENTRY;
+
+       OBD_ALLOC_PTR(pool);
+       if (pool == NULL)
+               RETURN(-ENOMEM);
+       CFS_INIT_LIST_HEAD(&pool->qpi_linkage);
+
+       /* assign key used by hash functions */
+       pool->qpi_key = pool_id + (pool_type << 16);
+
+       /* initialize refcount to 1, hash table will then grab an additional
+        * reference */
+       atomic_set(&pool->qpi_ref, 1);
+
+       /* set up least qunit size to use for this pool */
+       pool->qpi_least_qunit = LQUOTA_LEAST_QUNIT(pool_type);
+
+       /* create pool proc directory */
+       sprintf(qti->qti_buf, "%s-0x%x", RES_NAME(pool_type), pool_id);
+       pool->qpi_proc = lprocfs_register(qti->qti_buf, qmt->qmt_proc,
+                                         lprocfs_quota_qpi_vars, pool);
+       if (IS_ERR(pool->qpi_proc)) {
+               rc = PTR_ERR(pool->qpi_proc);
+               CERROR("%s: failed to create proc entry for pool %s (%d)\n",
+                      qmt->qmt_svname, qti->qti_buf, rc);
+               pool->qpi_proc = NULL;
+               GOTO(out, rc);
+       }
+
+       /* grab reference on master target that this pool belongs to */
+       lu_device_get(qmt2lu_dev(qmt));
+       lu_ref_add(&qmt2lu_dev(qmt)->ld_reference, "pool", pool);
+       pool->qpi_qmt = qmt;
+
+       /* add to qmt hash table */
+       rc = cfs_hash_add_unique(qmt->qmt_pool_hash, &pool->qpi_key,
+                                &pool->qpi_hash);
+       if (rc) {
+               CERROR("%s: failed to add pool %s to qmt hash (%d)\n",
+                      qmt->qmt_svname, qti->qti_buf, rc);
+               GOTO(out, rc);
+       }
+
+       /* add to qmt pool list */
+       cfs_list_add_tail(&pool->qpi_linkage, &qmt->qmt_pool_list);
+       EXIT;
+out:
+       if (rc)
+               /* this frees the pool structure since refcount is equal to 1 */
+               qpi_putref(env, pool);
+       return rc;
+}
+
+/*
+ * Delete a qmt_pool_info instance and all structures associated.
+ *
+ * \param env  - is the environment passed by the caller
+ * \param pool - is the qmt_pool_info structure to free
+ */
+static void qmt_pool_free(const struct lu_env *env, struct qmt_pool_info *pool)
+{
+       int     qtype;
+       ENTRY;
+
+       /* release proc entry */
+       if (pool->qpi_proc) {
+               lprocfs_remove(&pool->qpi_proc);
+               pool->qpi_proc = NULL;
+       }
+
+       /* release per-quota type site used to manage quota entries as well as
+        * references to global index files */
+       for (qtype = 0; qtype < MAXQUOTAS; qtype++) {
+               /* release site */
+               if (pool->qpi_site[qtype] != NULL &&
+                   !IS_ERR(pool->qpi_site[qtype]))
+                       lquota_site_free(env, pool->qpi_site[qtype]);
+               /* release reference to global index */
+               if (pool->qpi_glb_obj[qtype] != NULL &&
+                   !IS_ERR(pool->qpi_glb_obj[qtype]))
+                       lu_object_put(env, &pool->qpi_glb_obj[qtype]->do_lu);
+       }
+
+       /* release reference on pool directory */
+       if (pool->qpi_root != NULL && !IS_ERR(pool->qpi_root))
+               lu_object_put(env, &pool->qpi_root->do_lu);
+
+       /* release reference on the master target */
+       if (pool->qpi_qmt != NULL) {
+               struct lu_device *ld = qmt2lu_dev(pool->qpi_qmt);
+
+               lu_device_put(ld);
+               lu_ref_del(&ld->ld_reference, "pool", pool);
+               pool->qpi_qmt = NULL;
+       }
+
+       LASSERT(cfs_list_empty(&pool->qpi_linkage));
+       OBD_FREE_PTR(pool);
+}
+
+/*
+ * Look-up a pool in the hash table based on the pool ID and type.
+ *
+ * \param env     - is the environment passed by the caller
+ * \param qmt     - is the quota master target
+ * \param pool_id   - is the 16-bit identifier of the pool to look up
+ * \param pool_type - is the type of this pool, either LQUOTA_RES_MD or
+ *                    LQUOTA_RES_DT.
+ */
+static struct qmt_pool_info *qmt_pool_lookup(const struct lu_env *env,
+                                            struct qmt_device *qmt,
+                                            int pool_id, int pool_type)
+{
+       struct qmt_pool_info    *pool;
+       __u32                    key;
+       ENTRY;
+
+       LASSERT(qmt->qmt_pool_hash != NULL);
+
+       /* look-up pool in hash table */
+       key = pool_id + (pool_type << 16);
+       pool = cfs_hash_lookup(qmt->qmt_pool_hash, (void *)&key);
+       if (pool == NULL) {
+               /* this qmt isn't managing this pool! */
+               CERROR("%s: looking up quota entry for a pool (0x%x/%d) which "
+                      "isn't managed by this quota master target\n",
+                      qmt->qmt_svname, pool_id, pool_type);
+               RETURN(ERR_PTR(-ENOENT));
+       }
+       RETURN(pool);
+}
+
+/*
+ * Functions implementing the pool API, used by the qmt handlers
+ */
+
+/*
+ * Destroy all pools which are still in the hash table and free the pool
+ * hash table.
+ *
+ * \param env - is the environment passed by the caller
+ * \param qmt - is the quota master target
+ *
+ */
+void qmt_pool_fini(const struct lu_env *env, struct qmt_device *qmt)
+{
+       struct qmt_pool_info    *pool;
+       cfs_list_t              *pos, *n;
+       ENTRY;
+
+       if (qmt->qmt_pool_hash == NULL)
+               RETURN_EXIT;
+
+       /* parse list of pool and destroy each element */
+       cfs_list_for_each_safe(pos, n, &qmt->qmt_pool_list) {
+               pool = cfs_list_entry(pos, struct qmt_pool_info,
+                                     qpi_linkage);
+               /* remove from hash */
+               cfs_hash_del(qmt->qmt_pool_hash, &pool->qpi_key,
+                            &pool->qpi_hash);
+
+               /* remove from list */
+               cfs_list_del_init(&pool->qpi_linkage);
+
+               /* release extra reference taken in qmt_pool_alloc */
+               qpi_putref(env, pool);
+       }
+       LASSERT(cfs_list_empty(&qmt->qmt_pool_list));
+
+       cfs_hash_putref(qmt->qmt_pool_hash);
+       qmt->qmt_pool_hash = NULL;
+       EXIT;
+}
+
+/*
+ * Initialize pool configure for the quota master target. For now, we only
+ * support the default data (i.e. all OSTs) and metadata (i.e. all the MDTs)
+ * pool which are instantiated in this function.
+ *
+ * \param env - is the environment passed by the caller
+ * \param qmt - is the quota master target for which we have to initializa the
+ *              pool configuration
+ *
+ * \retval - 0 on success, appropriate error on failure
+ */
+int qmt_pool_init(const struct lu_env *env, struct qmt_device *qmt)
+{
+       int     res, rc = 0;
+       ENTRY;
+
+       /* initialize pool hash table */
+       qmt->qmt_pool_hash = cfs_hash_create("POOL_HASH",
+                                            HASH_POOLS_CUR_BITS,
+                                            HASH_POOLS_MAX_BITS,
+                                            HASH_POOLS_BKT_BITS, 0,
+                                            CFS_HASH_MIN_THETA,
+                                            CFS_HASH_MAX_THETA,
+                                            &qpi_hash_ops,
+                                            CFS_HASH_DEFAULT);
+       if (qmt->qmt_pool_hash == NULL) {
+               CERROR("%s: failed to create pool hash table\n",
+                      qmt->qmt_svname);
+               RETURN(-ENOMEM);
+       }
+
+       /* initialize pool list */
+       CFS_INIT_LIST_HEAD(&qmt->qmt_pool_list);
+
+       /* Instantiate pool master for the default data and metadata pool (both
+        * have pool ID equals to 0).
+        * This code will have to be revisited once we support quota on
+        * non-default pools */
+       for (res = LQUOTA_FIRST_RES; res < LQUOTA_LAST_RES; res++) {
+               rc = qmt_pool_alloc(env, qmt, 0, res);
+               if (rc)
+                       break;
+       }
+
+       if (rc)
+               qmt_pool_fini(env, qmt);
+
+       RETURN(rc);
+}
+
+static int qmt_slv_cnt(const struct lu_env *env, struct lu_fid *glb_fid,
+                      char *slv_name, struct lu_fid *slv_fid, void *arg)
+{
+       int *nr = arg;
+
+       /* one more slave */
+       (*nr)++;
+
+       return 0;
+}
+
+/*
+ * Set up on-disk index files associated with each pool.
+ *
+ * \param env - is the environment passed by the caller
+ * \param qmt - is the quota master target for which we have to initializa the
+ *              pool configuration
+ * \param qmt_root - is the on-disk directory created for the QMT.
+ *
+ * \retval - 0 on success, appropriate error on failure
+ */
+int qmt_pool_prepare(const struct lu_env *env, struct qmt_device *qmt,
+                    struct dt_object *qmt_root)
+{
+       struct qmt_thread_info  *qti = qmt_info(env);
+       struct qmt_pool_info    *pool;
+       struct dt_device        *dev = NULL;
+       cfs_list_t              *pos;
+       int                      rc = 0, qtype;
+       ENTRY;
+
+       LASSERT(qmt->qmt_pool_hash != NULL);
+
+       /* iterate over each pool in the hash and allocate a quota site for each
+        * one. This involves creating a global index file on disk */
+       cfs_list_for_each(pos, &qmt->qmt_pool_list) {
+               struct dt_object *obj;
+               int               pool_type, pool_id;
+
+               pool = cfs_list_entry(pos, struct qmt_pool_info,
+                                     qpi_linkage);
+
+               pool_id   = pool->qpi_key & 0x0000ffff;
+               pool_type = pool->qpi_key >> 16;
+               if (dev == NULL)
+                       dev = pool->qpi_qmt->qmt_child;
+
+               /* allocate directory for this pool */
+               sprintf(qti->qti_buf, "%s-0x%x", RES_NAME(pool_type), pool_id);
+               obj = lquota_disk_dir_find_create(env, qmt->qmt_child, qmt_root,
+                                                 qti->qti_buf);
+               if (IS_ERR(obj))
+                       RETURN(PTR_ERR(obj));
+               pool->qpi_root = obj;
+
+               for (qtype = 0; qtype < MAXQUOTAS; qtype++) {
+                       /* Generating FID of global index in charge of storing
+                        * settings for this quota type */
+                       lquota_generate_fid(&qti->qti_fid, pool_id, pool_type,
+                                           qtype);
+
+                       /* open/create the global index file for this quota
+                        * type */
+                       obj = lquota_disk_glb_find_create(env, dev,
+                                                         pool->qpi_root,
+                                                         &qti->qti_fid, false);
+                       if (IS_ERR(obj)) {
+                               rc = PTR_ERR(obj);
+                               CERROR("%s: failed to create glb index copy for"
+                                      " %s type (%d)\n", qmt->qmt_svname,
+                                      QTYPE_NAME(qtype), rc);
+                               RETURN(rc);
+                       }
+
+                       pool->qpi_glb_obj[qtype] = obj;
+
+                       /* create quota entry site for this quota type */
+                       pool->qpi_site[qtype] = lquota_site_alloc(env, pool,
+                                                                 true, qtype,
+                                                                 &qmt_lqe_ops);
+                       if (IS_ERR(pool->qpi_site[qtype])) {
+                               rc = PTR_ERR(pool->qpi_site[qtype]);
+                               CERROR("%s: failed to create site for %s type "
+                                      "(%d)\n", qmt->qmt_svname,
+                                      QTYPE_NAME(qtype), rc);
+                               RETURN(rc);
+                       }
+
+                       /* count number of slaves which already connected to
+                        * the master in the past */
+                       pool->qpi_slv_nr[qtype] = 0;
+                       rc = lquota_disk_for_each_slv(env, pool->qpi_root,
+                                                     &qti->qti_fid,
+                                                     qmt_slv_cnt,
+                                                     &pool->qpi_slv_nr[qtype]);
+                       if (rc) {
+                               CERROR("%s: failed to scan & count slave "
+                                      "indexes for %s type (%d)\n",
+                                      qmt->qmt_svname, QTYPE_NAME(qtype), rc);
+                               RETURN(rc);
+                       }
+#ifdef LPROCFS
+                       /* add procfs file to dump the global index, mostly for
+                        * debugging purpose */
+                       sprintf(qti->qti_buf, "glb-%s", QTYPE_NAME(qtype));
+                       rc = lprocfs_seq_create(pool->qpi_proc, qti->qti_buf,
+                                               0444, &lprocfs_quota_seq_fops,
+                                               obj);
+                       if (rc)
+                               CWARN("%s: Error adding procfs file for global"
+                                     "quota index "DFID", rc:%d\n",
+                                     qmt->qmt_svname, PFID(&qti->qti_fid), rc);
+#endif
+               }
+       }
+
+       RETURN(0);
+}
+
+/*
+ * Handle new slave connection. Called when a slave enqueues the global quota
+ * lock at the beginning of the reintegration procedure.
+ *
+ * \param env - is the environment passed by the caller
+ * \parap qmt - is the quota master target handling this request
+ * \param glb_fid - is the fid of the global index file
+ * \param slv_fid - is the fid of the newly created slave index file
+ * \param slv_ver - is the current version of the slave index file
+ * \param uuid    - is the uuid of slave which is (re)connecting to the master
+ *                  target
+ *
+ * \retval - 0 on success, appropriate error on failure
+ */
+int qmt_pool_new_conn(const struct lu_env *env, struct qmt_device *qmt,
+                     struct lu_fid *glb_fid, struct lu_fid *slv_fid,
+                     __u64 *slv_ver, struct obd_uuid *uuid)
+{
+       struct qmt_pool_info    *pool;
+       struct dt_object        *slv_obj;
+       int                      pool_id, pool_type, qtype;
+       bool                     created = false;
+       int                      rc = 0;
+
+       /* extract pool info from global index FID */
+       rc = lquota_extract_fid(glb_fid, &pool_id, &pool_type, &qtype);
+       if (rc)
+               RETURN(rc);
+
+       /* look-up pool in charge of this global index FID */
+       pool = qmt_pool_lookup(env, qmt, pool_id, pool_type);
+       if (IS_ERR(pool))
+               RETURN(PTR_ERR(pool));
+
+       /* look-up slave index file */
+       slv_obj = lquota_disk_slv_find(env, qmt->qmt_child, pool->qpi_root,
+                                      glb_fid, uuid);
+       if (IS_ERR(slv_obj) && PTR_ERR(slv_obj) == -ENOENT) {
+               /* create slave index file */
+               slv_obj = lquota_disk_slv_find_create(env, qmt->qmt_child,
+                                                     pool->qpi_root, glb_fid,
+                                                     uuid, false);
+               created = true;
+       }
+       if (IS_ERR(slv_obj)) {
+               rc = PTR_ERR(slv_obj);
+               CERROR("%s: failed to create quota slave index file for %s (%d)"
+                      "\n", qmt->qmt_svname, obd_uuid2str(uuid), rc);
+               GOTO(out, rc);
+       }
+
+       /* retrieve slave fid & current object version */
+       memcpy(slv_fid, lu_object_fid(&slv_obj->do_lu), sizeof(*slv_fid));
+       *slv_ver = dt_version_get(env, slv_obj);
+       lu_object_put(env, &slv_obj->do_lu);
+       if (created)
+               pool->qpi_slv_nr[qtype]++;
+out:
+       qpi_putref(env, pool);
+       RETURN(rc);
+}
+
+/*
+ * Look-up a lquota_entry in the pool hash and allocate it if not found.
+ *
+ * \param env - is the environment passed by the caller
+ * \param qmt - is the quota master target for which we have to initializa the
+ *              pool configuration
+ * \param pool_id   - is the 16-bit identifier of the pool
+ * \param pool_type - is the pool type, either LQUOTA_RES_MD or LQUOTA_RES_DT.
+ * \param qtype     - is the quota type, either user or group.
+ * \param qid       - is the quota ID to look-up
+ *
+ * \retval - valid pointer to lquota entry on success, appropriate error on
+ *           failure
+ */
+struct lquota_entry *qmt_pool_lqe_lookup(const struct lu_env *env,
+                                        struct qmt_device *qmt,
+                                        int pool_id, int pool_type,
+                                        int qtype, union lquota_id *qid)
+{
+       struct qmt_pool_info    *pool;
+       struct lquota_entry     *lqe;
+       ENTRY;
+
+       /* look-up pool responsible for this global index FID */
+       pool = qmt_pool_lookup(env, qmt, pool_id, pool_type);
+       if (IS_ERR(pool))
+               RETURN((void *)pool);
+
+       /* now that we have the pool, let's look-up the quota entry in the
+        * right quota site */
+       lqe = lqe_locate(env, pool->qpi_site[qtype], qid);
+
+       qpi_putref(env, pool);
+       RETURN(lqe);
+}