{
cfs_atomic_inc(&ns->ns_bref);
}
+EXPORT_SYMBOL(ldlm_namespace_get);
void ldlm_namespace_put(struct ldlm_namespace *ns)
{
cfs_spin_unlock(&ns->ns_lock);
}
}
+EXPORT_SYMBOL(ldlm_namespace_put);
/* Register @ns in the list of namespaces */
void ldlm_namespace_register(struct ldlm_namespace *ns, ldlm_side_t client)
MODULES := lquota
-qmt-objs := qmt_dev.o qmt_handler.o qmt_lock.o
+qmt-objs := qmt_dev.o qmt_handler.o qmt_lock.o qmt_entry.o qmt_pool.o
quota-objs := lproc_quota.o lquota_lib.o lquota_disk.o lquota_entry.o
* in the LICENSE file that accompanied this code).
*
* You should have received a copy of the GNU General Public License
- * version 2 along with this program; If not, see
- * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
- *
- * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
- * CA 95054 USA or visit www.sun.com if you need additional information or
- * have any questions.
+ * version 2 along with this program; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 021110-1307, USA
*
* GPL HEADER END
*/
/*
- * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2012 Intel, Inc.
* Use is subject to license terms.
*
- * Copyright (c) 2011, Whamcloud, Inc.
- */
-/*
- * This file is part of Lustre, http://www.lustre.org/
- * Lustre is a trademark of Sun Microsystems, Inc.
+ * Author: Johann Lombardi <johann.lombardi@intel.com>
+ * Author: Niu Yawei <yawei.niu@intel.com>
*/
#define DEBUG_SUBSYSTEM S_LQUOTA
/*
* Output example:
*
- * user_accounting:
+ * usr_accounting:
* - id: 0
- * usage: { inodes: 209, bytes: 26161152 }
+ * usage: { inodes: 209, kbytes: 2616 }
* - id: 840000038
- * usage: { inodes: 1, bytes: 10485760 }
+ * usage: { inodes: 1, kbytes: 1048 }
*/
static int lprocfs_quota_seq_show(struct seq_file *p, void *v)
{
- struct lquota_procfs *lqp = p->private;
- const struct dt_it_ops *iops;
- struct dt_it *it;
- struct dt_key *key;
- struct lquota_acct_rec rec;
- int rc;
+ struct lquota_procfs *lqp = p->private;
+ struct lquota_thread_info *qti = lquota_info(&lqp->lqp_env);
+ const struct dt_it_ops *iops;
+ struct dt_it *it;
+ struct dt_key *key;
+ struct dt_rec *rec = (struct dt_rec *)&qti->qti_rec;
+ const struct lu_fid *fid;
+ int rc;
LASSERT(lqp);
if (lqp->lqp_obj == NULL) {
return 0;
}
- if (v == SEQ_START_TOKEN) {
- const struct lu_fid *fid = lu_object_fid(&lqp->lqp_obj->do_lu);
+ fid = lu_object_fid(&lqp->lqp_obj->do_lu);
- LASSERT(fid_is_acct(fid));
- if (fid_oid(fid) == ACCT_USER_OID)
- seq_printf(p, "user_accounting:\n");
- else
- seq_printf(p, "group_accounting:\n");
+ if (v == SEQ_START_TOKEN) {
+ if (fid_is_acct(fid)) {
+ if (fid_oid(fid) == ACCT_USER_OID)
+ seq_printf(p, "usr_accounting:\n");
+ else
+ seq_printf(p, "grp_accounting:\n");
+ } else if (fid_seq(fid) == FID_SEQ_QUOTA_GLB) {
+ int poolid, rtype, qtype;
+
+ rc = lquota_extract_fid(fid, &poolid, &rtype, &qtype);
+ if (rc)
+ return rc;
+
+ seq_printf(p, "global_pool%d_%s_%s\n", poolid,
+ RES_NAME(rtype), QTYPE_NAME(qtype));
+ } else {
+ return -ENOTSUPP;
+ }
return 0;
}
return PTR_ERR(key);
}
- rc = iops->rec(&lqp->lqp_env, it, (struct dt_rec *)&rec, 0);
+ rc = iops->rec(&lqp->lqp_env, it, rec, 0);
if (rc) {
CERROR("%s: failed to get rec: rc = %d\n",
lqp->lqp_obj->do_lu.lo_dev->ld_obd->obd_name, rc);
}
seq_printf(p, "- %-8s %llu\n", "id:", *((__u64 *)key));
- seq_printf(p, " %-8s { inodes: %20"LPF64"u, bytes: %20"LPF64"u }\n",
- "usage:", rec.ispace, rec.bspace);
+ if (fid_is_acct(fid))
+ seq_printf(p, " %-8s { inodes: %20"LPF64"u, kbytes: %20"LPF64
+ "u }\n", "usage:",
+ ((struct lquota_acct_rec *)rec)->ispace,
+ toqb(((struct lquota_acct_rec *)rec)->bspace));
+ else if (fid_seq(fid) == FID_SEQ_QUOTA_GLB)
+ seq_printf(p, " %-8s { hard: %20"LPF64"u, soft: %20"LPF64
+ "u, granted: %20"LPF64"u, time: %20"LPF64"u }\n",
+ "limits:",
+ ((struct lquota_glb_rec *)rec)->qbr_hardlimit,
+ ((struct lquota_glb_rec *)rec)->qbr_softlimit,
+ ((struct lquota_glb_rec *)rec)->qbr_granted,
+ ((struct lquota_glb_rec *)rec)->qbr_time);
return 0;
}
obj = dt_locate(env, dev, &qti->qti_fid);
if (IS_ERR(obj))
GOTO(out, obj);
-
- /* install index operation vector */
- if (obj->do_index_ops == NULL) {
- rc = obj->do_ops->do_index_try(env, obj, idx_feat);
- if (rc) {
- CERROR("%s: fail to setup index operations for "DFID
- " rc:%d\n", dev->dd_lu_dev.ld_obd->obd_name,
- PFID(lu_object_fid(&obj->do_lu)), rc);
- lu_object_put(env, &obj->do_lu);
- obj = ERR_PTR(rc);
- }
- }
out:
local_oid_storage_fini(env, los);
RETURN(obj);
idx_feat);
}
- if (IS_ERR(glb_idx))
+ if (IS_ERR(glb_idx)) {
CERROR("%s: failed to look-up/create idx file "DFID" rc:%ld "
"local:%d\n", dev->dd_lu_dev.ld_obd->obd_name,
PFID(fid), PTR_ERR(glb_idx), local);
+ RETURN(glb_idx);
+ }
+
+ /* install index operation vector */
+ if (glb_idx->do_index_ops == NULL) {
+ int rc;
+
+ rc = glb_idx->do_ops->do_index_try(env, glb_idx, idx_feat);
+ if (rc) {
+ CERROR("%s: failed to setup index operations for "DFID
+ " rc:%d\n", dev->dd_lu_dev.ld_obd->obd_name,
+ PFID(lu_object_fid(&glb_idx->do_lu)), rc);
+ lu_object_put(env, &glb_idx->do_lu);
+ glb_idx = ERR_PTR(rc);
+ }
+ }
RETURN(glb_idx);
}
qti->qti_buf);
}
+ if (IS_ERR(slv_idx))
+ RETURN(slv_idx);
+
+ /* install index operation vector */
+ if (slv_idx->do_index_ops == NULL) {
+ rc = slv_idx->do_ops->do_index_try(env, slv_idx,
+ &dt_quota_slv_features);
+ if (rc) {
+ CERROR("%s: failed to setup index operations for "DFID
+ " rc:%d\n", dev->dd_lu_dev.ld_obd->obd_name,
+ PFID(lu_object_fid(&slv_idx->do_lu)), rc);
+ lu_object_put(env, &slv_idx->do_lu);
+ slv_idx = ERR_PTR(rc);
+ }
+ }
+
RETURN(slv_idx);
}
static int hash_lqs_cur_bits = HASH_LQE_CUR_BITS;
CFS_MODULE_PARM(hash_lqs_cur_bits, "i", int, 0444,
"the current bits of lqe hash");
-cfs_mem_cache_t *lqe_kmem;
static unsigned lqe64_hash_hash(cfs_hash_t *hs, const void *key, unsigned mask)
{
#define _LQUOTA_INTERNAL_H
#define QTYPE_NAME(qtype) ((qtype) == USRQUOTA ? "usr" : "grp")
+#define RES_NAME(res) ((res) == LQUOTA_RES_MD ? "md" : "dt")
#define QIF_IFLAGS (QIF_INODES | QIF_ITIME | QIF_ILIMITS)
#define QIF_BFLAGS (QIF_SPACE | QIF_BTIME | QIF_BLIMITS)
cfs_read_unlock(&lqe->lqe_lock);
}
+/*
+ * Helper functions & prototypes
+ */
+
+/* minimum qunit size, 1K inode for metadata pool and 1MB for data pool */
+#define LQUOTA_LEAST_QUNIT(type) \
+ (type == LQUOTA_RES_MD ? (1 << 10) : toqb(PTLRPC_MAX_BRW_SIZE))
+
/* Common data shared by quota-level handlers. This is allocated per-thread to
* reduce stack consumption */
struct lquota_thread_info {
struct dt_object *acct_obj_lookup(const struct lu_env *, struct dt_device *,
int);
void lquota_generate_fid(struct lu_fid *, int, int, int);
-int lquota_extract_fid(struct lu_fid *, int *, int *, int *);
+int lquota_extract_fid(const struct lu_fid *, int *, int *, int *);
const struct dt_index_features *glb_idx_feature(struct lu_fid *);
+extern cfs_mem_cache_t *lqe_kmem;
/* lquota_entry.c */
/* site create/destroy */
#include "lquota_internal.h"
+cfs_mem_cache_t *lqe_kmem;
+
+struct lu_kmem_descr lquota_caches[] = {
+ {
+ .ckd_cache = &lqe_kmem,
+ .ckd_name = "lqe_kmem",
+ .ckd_size = sizeof(struct lquota_entry)
+ },
+ {
+ .ckd_cache = NULL
+ }
+};
+
/* register lquota key */
LU_KEY_INIT_FINI(lquota, struct lquota_thread_info);
LU_CONTEXT_KEY_DEFINE(lquota, LCT_MD_THREAD | LCT_DT_THREAD | LCT_LOCAL);
* Helper routine used to extract pool ID, pool type and quota type from a
* given FID.
*/
-int lquota_extract_fid(struct lu_fid *fid, int *pool_id, int *pool_type,
+int lquota_extract_fid(const struct lu_fid *fid, int *pool_id, int *pool_type,
int *quota_type)
{
unsigned int tmp;
#warning "remove old quota compatibility code"
#endif
- rc = qmt_glb_init();
+ rc = lu_kmem_init(lquota_caches);
if (rc)
GOTO(out_key, rc);
+ rc = qmt_glb_init();
+ if (rc)
+ GOTO(out_caches, rc);
+
rc = qsd_glb_init();
if (rc)
GOTO(out_qmt, rc);
out_qmt:
qmt_glb_fini();
+out_caches:
+ lu_kmem_fini(lquota_caches);
out_key:
lu_context_key_degister(&lquota_thread_key);
return rc;
{
qsd_glb_fini();
qmt_glb_fini();
+ lu_kmem_fini(lquota_caches);
lu_context_key_degister(&lquota_thread_key);
}
LASSERT(qmt != NULL);
CDEBUG(D_QUOTA, "%s: initiating QMT shutdown\n", qmt->qmt_svname);
+ qmt->qmt_stopping = true;
+
+ /* remove qmt proc entry */
+ if (qmt->qmt_proc != NULL && !IS_ERR(qmt->qmt_proc)) {
+ lprocfs_remove(&qmt->qmt_proc);
+ qmt->qmt_proc = NULL;
+ }
+
+ /* kill pool instances, if any */
+ qmt_pool_fini(env, qmt);
/* disconnect from OSD */
if (qmt->qmt_child_exp != NULL) {
obd_disconnect(qmt->qmt_child_exp);
qmt->qmt_child_exp = NULL;
- qmt->qmt_child = NULL;
+ qmt->qmt_child = NULL;
+ }
+
+ /* release reference on MDT namespace */
+ if (ld->ld_obd->obd_namespace != NULL) {
+ ldlm_namespace_put(ld->ld_obd->obd_namespace);
+ ld->ld_obd->obd_namespace = NULL;
+ qmt->qmt_ns = NULL;
}
RETURN(NULL);
/* initialize site (although it isn't used anywhere) and lu_device
* pointer to next device */
qmt->qmt_child = lu2dt_dev(qmt->qmt_child_exp->exp_obd->obd_lu_dev);
- ld->ld_site = qmt->qmt_child_exp->exp_obd->obd_lu_dev->ld_site;
+ ld->ld_site = qmt->qmt_child_exp->exp_obd->obd_lu_dev->ld_site;
EXIT;
out:
if (data)
struct lu_device_type *ldt, struct lustre_cfg *cfg)
{
struct lu_device *ld = qmt2lu_dev(qmt);
- struct obd_device *obd;
+ struct obd_device *obd, *mdt_obd;
+ struct obd_type *type;
int rc;
ENTRY;
obd->obd_lu_dev = ld;
ld->ld_obd = obd;
+ /* look-up the parent MDT to steal its ldlm namespace ... */
+ mdt_obd = class_name2obd(lustre_cfg_string(cfg, 2));
+ if (mdt_obd == NULL)
+ RETURN(-ENOENT);
+
+ /* grab reference on MDT namespace. kind of a hack until we have our
+ * own namespace & service threads */
+ LASSERT(mdt_obd->obd_namespace != NULL);
+ ldlm_namespace_get(mdt_obd->obd_namespace);
+ obd->obd_namespace = mdt_obd->obd_namespace;
+ qmt->qmt_ns = obd->obd_namespace;
+
/* connect to backend osd device */
rc = qmt_connect_to_osd(env, qmt, cfg);
if (rc)
GOTO(out, rc);
+ /* at the moment there is no linkage between lu_type and obd_type, so
+ * we lookup obd_type this way */
+ type = class_search_type(LUSTRE_QMT_NAME);
+ LASSERT(type != NULL);
+
+ /* register proc directory associated with this qmt */
+ qmt->qmt_proc = lprocfs_register(qmt->qmt_svname, type->typ_procroot,
+ NULL, NULL);
+ if (IS_ERR(qmt->qmt_proc)) {
+ rc = PTR_ERR(qmt->qmt_proc);
+ CERROR("%s: failed to create qmt proc entry (%d)\n",
+ qmt->qmt_svname, rc);
+ GOTO(out, rc);
+ }
+
+ /* initialize pool configuration */
+ rc = qmt_pool_init(env, qmt);
+ if (rc)
+ GOTO(out, rc);
EXIT;
out:
if (rc)
struct lu_device *parent,
struct lu_device *ld)
{
- return 0;
+ struct qmt_device *qmt = lu2qmt_dev(ld);
+ struct dt_object *qmt_root;
+ int rc;
+ ENTRY;
+
+ /* initialize quota master root directory where all index files will be
+ * stored */
+ qmt_root = lquota_disk_dir_find_create(env, qmt->qmt_child, NULL,
+ QMT_DIR);
+ if (IS_ERR(qmt_root)) {
+ rc = PTR_ERR(qmt_root);
+ CERROR("%s: failed to create master quota directory (%d)\n",
+ qmt->qmt_svname, rc);
+ RETURN(rc);
+ }
+
+ /* initialize on-disk indexes associated with each pool */
+ rc = qmt_pool_prepare(env, qmt, qmt_root);
+
+ lu_object_put(env, &qmt_root->do_lu);
+ RETURN(rc);
}
/*
--- /dev/null
+/*
+ * GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 021110-1307, USA
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright (c) 2012 Intel, Inc.
+ * Use is subject to license terms.
+ *
+ * Author: Johann Lombardi <johann.lombardi@intel.com>
+ * Author: Niu Yawei <yawei.niu@intel.com>
+ */
+
+#ifndef EXPORT_SYMTAB
+# define EXPORT_SYMTAB
+#endif
+
+#define DEBUG_SUBSYSTEM S_LQUOTA
+
+#include "qmt_internal.h"
+
+/*
+ * Initialize qmt-specific fields of quota entry.
+ *
+ * \param lqe - is the quota entry to initialize
+ * \param arg - is the pointer to the qmt_pool_info structure
+ */
+static void qmt_lqe_init(struct lquota_entry *lqe, void *arg)
+{
+ LASSERT(lqe_is_master(lqe));
+
+ lqe->lqe_revoke_time = 0;
+ cfs_init_rwsem(&lqe->lqe_sem);
+}
+
+/*
+ * Update a lquota entry. This is done by reading quota settings from the global
+ * index. The lquota entry must be write locked.
+ *
+ * \param env - the environment passed by the caller
+ * \param lqe - is the quota entry to refresh
+ * \param arg - is the pointer to the qmt_pool_info structure
+ */
+static int qmt_lqe_read(const struct lu_env *env, struct lquota_entry *lqe,
+ void *arg)
+{
+ struct qmt_thread_info *qti = qmt_info(env);
+ struct qmt_pool_info *pool = (struct qmt_pool_info *)arg;
+ int rc;
+ ENTRY;
+
+ LASSERT(lqe_is_master(lqe));
+
+ /* read record from disk */
+ rc = lquota_disk_read(env, pool->qpi_glb_obj[lqe->lqe_site->lqs_qtype],
+ &lqe->lqe_id, (struct dt_rec *)&qti->qti_glb_rec);
+
+ switch (rc) {
+ case -ENOENT:
+ /* no such entry, assume quota isn't enforced for this user */
+ lqe->lqe_enforced = false;
+ break;
+ case 0:
+ /* copy quota settings from on-disk record */
+ lqe->lqe_granted = qti->qti_glb_rec.qbr_granted;
+ lqe->lqe_hardlimit = qti->qti_glb_rec.qbr_hardlimit;
+ lqe->lqe_softlimit = qti->qti_glb_rec.qbr_softlimit;
+ lqe->lqe_gracetime = qti->qti_glb_rec.qbr_time;
+
+ if (lqe->lqe_hardlimit == 0 && lqe->lqe_softlimit == 0)
+ /* {hard,soft}limit=0 means no quota enforced */
+ lqe->lqe_enforced = false;
+ else
+ lqe->lqe_enforced = true;
+
+ break;
+ default:
+ LQUOTA_ERROR(lqe, "failed to read quota entry from disk, rc:%d",
+ rc);
+ RETURN(rc);
+ }
+
+ LQUOTA_DEBUG(lqe, "read");
+ RETURN(0);
+}
+
+/*
+ * Print lqe information for debugging.
+ *
+ * \param lqe - is the quota entry to debug
+ * \param arg - is the pointer to the qmt_pool_info structure
+ * \param msgdata - debug message
+ * \param fmt - format of debug message
+ */
+static void qmt_lqe_debug(struct lquota_entry *lqe, void *arg,
+ struct libcfs_debug_msg_data *msgdata,
+ const char *fmt, va_list args)
+{
+ struct qmt_pool_info *pool = (struct qmt_pool_info *)arg;
+
+ libcfs_debug_vmsg2(msgdata, fmt, args,
+ "qmt:%s pool:%d-%s id:"LPU64" enforced:%d hard:"LPU64
+ " soft:"LPU64" granted:"LPU64" time:"LPU64" qunit:"
+ LPU64" edquot:%d revoke:"LPU64"\n",
+ pool->qpi_qmt->qmt_svname,
+ pool->qpi_key & 0x0000ffff,
+ RES_NAME(pool->qpi_key >> 16),
+ lqe->lqe_id.qid_uid, lqe->lqe_enforced,
+ lqe->lqe_hardlimit, lqe->lqe_softlimit,
+ lqe->lqe_granted, lqe->lqe_gracetime,
+ lqe->lqe_qunit, lqe->lqe_edquot,
+ lqe->lqe_revoke_time);
+}
+
+/*
+ * Vector of quota entry operations supported on the master
+ */
+struct lquota_entry_operations qmt_lqe_ops = {
+ .lqe_init = qmt_lqe_init,
+ .lqe_read = qmt_lqe_read,
+ .lqe_debug = qmt_lqe_debug,
+};
+
+/*
+ * Reserve enough credits to update records in both the global index and
+ * the slave index identified by \slv_obj
+ *
+ * \param env - is the environment passed by the caller
+ * \param lqe - is the quota entry associated with the identifier
+ * subject to the change
+ * \param slv_obj - is the dt_object associated with the index file
+ * \param restore - is a temporary storage for current quota settings which will
+ * be restored if something goes wrong at index update time.
+ */
+struct thandle *qmt_trans_start_with_slv(const struct lu_env *env,
+ struct lquota_entry *lqe,
+ struct dt_object *slv_obj,
+ struct qmt_lqe_restore *restore)
+{
+ struct qmt_device *qmt;
+ struct thandle *th;
+ int rc;
+ ENTRY;
+
+ LASSERT(lqe != NULL);
+ LASSERT(lqe_is_master(lqe));
+
+ qmt = lqe2qpi(lqe)->qpi_qmt;
+
+ if (slv_obj != NULL)
+ LQUOTA_DEBUG(lqe, "declare write for slv "DFID,
+ PFID(lu_object_fid(&slv_obj->do_lu)));
+
+ /* start transaction */
+ th = dt_trans_create(env, qmt->qmt_child);
+ if (IS_ERR(th))
+ RETURN(th);
+
+ if (slv_obj == NULL)
+ /* quota settings on master are updated synchronously for the
+ * time being */
+ th->th_sync = 1;
+
+ /* reserve credits for global index update */
+ rc = lquota_disk_declare_write(env, th, LQE_GLB_OBJ(lqe), &lqe->lqe_id);
+ if (rc)
+ GOTO(out, rc);
+
+ if (slv_obj != NULL) {
+ /* reserve credits for slave index update */
+ rc = lquota_disk_declare_write(env, th, slv_obj, &lqe->lqe_id);
+ if (rc)
+ GOTO(out, rc);
+ }
+
+ /* start transaction */
+ rc = dt_trans_start_local(env, qmt->qmt_child, th);
+ if (rc)
+ GOTO(out, rc);
+
+ EXIT;
+out:
+ if (rc) {
+ dt_trans_stop(env, qmt->qmt_child, th);
+ th = ERR_PTR(rc);
+ LQUOTA_ERROR(lqe, "failed to slv declare write for "DFID
+ ", rc:%d", PFID(lu_object_fid(&slv_obj->do_lu)),
+ rc);
+ } else {
+ restore->qlr_hardlimit = lqe->lqe_hardlimit;
+ restore->qlr_softlimit = lqe->lqe_softlimit;
+ restore->qlr_gracetime = lqe->lqe_gracetime;
+ restore->qlr_granted = lqe->lqe_granted;
+ restore->qlr_qunit = lqe->lqe_qunit;
+ }
+ return th;
+}
+
+/*
+ * Reserve enough credits to update a record in the global index
+ *
+ * \param env - is the environment passed by the caller
+ * \param lqe - is the quota entry to be modified in the global index
+ * \param restore - is a temporary storage for current quota settings which will
+ * be restored if something goes wrong at index update time.
+ */
+struct thandle *qmt_trans_start(const struct lu_env *env,
+ struct lquota_entry *lqe,
+ struct qmt_lqe_restore *restore)
+{
+ LQUOTA_DEBUG(lqe, "declare write");
+ return qmt_trans_start_with_slv(env, lqe, NULL, restore);
+}
+
+/*
+ * Update record associated with a quota entry in the global index.
+ * If LQUOTA_BUMP_VER is set, then the global index version must also be
+ * bumped.
+ * The entry must be at least read locked, dirty and up-to-date.
+ *
+ * \param env - the environment passed by the caller
+ * \param th - is the transaction handle to be used for the disk writes
+ * \param lqe - is the quota entry to udpate
+ * \param obj - is the dt_object associated with the index file
+ * \param flags - can be LQUOTA_BUMP_VER or LQUOTA_SET_VER.
+ * \param ver - is used to return the new version of the index.
+ *
+ * \retval - 0 on success and lqe dirty flag cleared,
+ * appropriate error on failure and uptodate flag cleared.
+ */
+int qmt_glb_write(const struct lu_env *env, struct thandle *th,
+ struct lquota_entry *lqe, __u32 flags, __u64 *ver)
+{
+ struct qmt_thread_info *qti = qmt_info(env);
+ struct lquota_glb_rec *rec;
+ int rc;
+ ENTRY;
+
+ LASSERT(lqe != NULL);
+ LASSERT(lqe_is_master(lqe));
+ LASSERT(lqe_is_locked(lqe));
+ LASSERT(lqe->lqe_uptodate);
+ LASSERT((flags & ~(LQUOTA_BUMP_VER | LQUOTA_SET_VER)) == 0);
+
+ LQUOTA_DEBUG(lqe, "write glb");
+
+ if (!lqe->lqe_enforced && lqe->lqe_granted == 0 &&
+ lqe->lqe_id.qid_uid != 0) {
+ /* quota isn't enforced any more for this entry and there is no
+ * more space granted to slaves, let's just remove the entry
+ * from the index */
+ rec = NULL;
+ } else {
+ rec = &qti->qti_glb_rec;
+
+ /* fill global index with updated quota settings */
+ rec->qbr_granted = lqe->lqe_granted;
+ rec->qbr_hardlimit = lqe->lqe_hardlimit;
+ rec->qbr_softlimit = lqe->lqe_softlimit;
+ rec->qbr_time = lqe->lqe_gracetime;
+ }
+
+ /* write new quota settings */
+ rc = lquota_disk_write(env, th, LQE_GLB_OBJ(lqe), &lqe->lqe_id,
+ (struct dt_rec *)rec, flags, ver);
+ if (rc)
+ /* we failed to write the new quota settings to disk, report
+ * error to caller who will restore the initial value */
+ LQUOTA_ERROR(lqe, "failed to update global index, rc:%d", rc);
+
+ RETURN(rc);
+}
+
+/*
+ * Read from disk how much quota space is allocated to a slave.
+ * This is done by reading records from the dedicated slave index file.
+ * Return in \granted how much quota space is currently allocated to the
+ * slave.
+ * The entry must be at least read locked.
+ *
+ * \param env - the environment passed by the caller
+ * \param lqe - is the quota entry associated with the identifier to look-up
+ * in the slave index
+ * \param slv_obj - is the dt_object associated with the slave index
+ * \param granted - is the output parameter where to return how much space
+ * is granted to the slave.
+ *
+ * \retval - 0 on success, appropriate error on failure
+ */
+int qmt_slv_read(const struct lu_env *env, struct lquota_entry *lqe,
+ struct dt_object *slv_obj, __u64 *granted)
+{
+ struct qmt_thread_info *qti = qmt_info(env);
+ struct lquota_slv_rec *slv_rec = &qti->qti_slv_rec;
+ int rc;
+ ENTRY;
+
+ LASSERT(lqe != NULL);
+ LASSERT(lqe_is_master(lqe));
+ LASSERT(lqe_is_locked(lqe));
+
+ LQUOTA_DEBUG(lqe, "read slv "DFID,
+ PFID(lu_object_fid(&slv_obj->do_lu)));
+
+ /* read slave record from disk */
+ rc = lquota_disk_read(env, slv_obj, &lqe->lqe_id,
+ (struct dt_rec *)slv_rec);
+ switch (rc) {
+ case -ENOENT:
+ *granted = 0;
+ break;
+ case 0:
+ /* extract granted from on-disk record */
+ *granted = slv_rec->qsr_granted;
+ break;
+ default:
+ LQUOTA_ERROR(lqe, "failed to read slave record "DFID,
+ PFID(lu_object_fid(&slv_obj->do_lu)));
+ RETURN(rc);
+ }
+
+ LQUOTA_DEBUG(lqe, "successful slv read "LPU64, *granted);
+
+ RETURN(0);
+}
+
+/*
+ * Update record in slave index file.
+ * The entry must be at least read locked.
+ *
+ * \param env - the environment passed by the caller
+ * \param th - is the transaction handle to be used for the disk writes
+ * \param lqe - is the dirty quota entry which will be updated at the same time
+ * as the slave index
+ * \param slv_obj - is the dt_object associated with the slave index
+ * \param flags - can be LQUOTA_BUMP_VER or LQUOTA_SET_VER.
+ * \param ver - is used to return the new version of the index.
+ * \param granted - is the new amount of quota space owned by the slave
+ *
+ * \retval - 0 on success, appropriate error on failure
+ */
+int qmt_slv_write(const struct lu_env *env, struct thandle *th,
+ struct lquota_entry *lqe, struct dt_object *slv_obj,
+ __u32 flags, __u64 *ver, __u64 granted)
+{
+ struct qmt_thread_info *qti = qmt_info(env);
+ struct lquota_slv_rec *rec;
+ int rc;
+ ENTRY;
+
+ LASSERT(lqe != NULL);
+ LASSERT(lqe_is_master(lqe));
+ LASSERT(lqe_is_locked(lqe));
+
+ LQUOTA_DEBUG(lqe, "write slv "DFID" granted:"LPU64,
+ PFID(lu_object_fid(&slv_obj->do_lu)), granted);
+
+ if (granted == 0) {
+ /* this slave does not own any quota space for this ID any more,
+ * so let's just remove the entry from the index */
+ rec = NULL;
+ } else {
+ rec = &qti->qti_slv_rec;
+
+ /* updated space granted to this slave */
+ rec->qsr_granted = granted;
+ }
+
+ /* write new granted space */
+ rc = lquota_disk_write(env, th, slv_obj, &lqe->lqe_id,
+ (struct dt_rec *)rec, flags, ver);
+ if (rc) {
+ LQUOTA_ERROR(lqe, "failed to update slave index "DFID" granted:"
+ LPU64, PFID(lu_object_fid(&slv_obj->do_lu)),
+ granted);
+ RETURN(rc);
+ }
+
+ RETURN(0);
+}
+
+/*
+ * Check whether new limits are valid for this pool
+ *
+ * \param lqe - is the quota entry subject to the setquota
+ * \param hard - is the new hard limit
+ * \param soft - is the new soft limit
+ */
+int qmt_validate_limits(struct lquota_entry *lqe, __u64 hard, __u64 soft)
+{
+ ENTRY;
+
+ if (hard != 0 && soft > hard)
+ /* soft limit must be less than hard limit */
+ RETURN(-EINVAL);
+ RETURN(0);
+}
#include "qmt_internal.h"
/*
+ * Fetch grace time for either inode or block.
+ *
+ * \param env - is the environment passed by the caller
+ * \param qmt - is the quota master target
+ * \param pool_id - is the 16-bit pool identifier
+ * \param restype - is the pool type, either block (i.e. LQUOTA_RES_DT) or inode
+ * (i.e. LQUOTA_RES_MD)
+ * \param qtype - is the quota type
+ * \param time - is the output variable where to copy the grace time
+ */
+static int qmt_getinfo(const struct lu_env *env, struct qmt_device *qmt,
+ __u16 pool_id, __u8 restype, __u8 qtype, __u64 *time)
+{
+ struct qmt_thread_info *qti = qmt_info(env);
+ union lquota_id *id = &qti->qti_id_bis;
+ struct lquota_entry *lqe;
+ ENTRY;
+
+ /* Global grace time is stored in quota settings of ID 0. */
+ id->qid_uid = 0;
+
+ /* look-up quota entry storing grace time */
+ lqe = qmt_pool_lqe_lookup(env, qmt, pool_id, restype, qtype, id);
+ if (IS_ERR(lqe))
+ RETURN(PTR_ERR(lqe));
+
+ lqe_read_lock(lqe);
+ LQUOTA_DEBUG(lqe, "getinfo");
+ /* copy grace time */
+ *time = lqe->lqe_gracetime;
+ lqe_read_unlock(lqe);
+
+ lqe_putref(lqe);
+ RETURN(0);
+}
+
+/*
+ * Update grace time for either inode or block.
+ * Global grace time is stored in quota settings of ID 0.
+ *
+ * \param env - is the environment passed by the caller
+ * \param qmt - is the quota master target
+ * \param pool_id - is the 16-bit pool identifier
+ * \param restype - is the pool type, either block (i.e. LQUOTA_RES_DT) or inode
+ * (i.e. LQUOTA_RES_MD)
+ * \param qtype - is the quota type
+ * \param time - is the new grace time
+ */
+static int qmt_setinfo(const struct lu_env *env, struct qmt_device *qmt,
+ __u16 pool_id, __u8 restype, __u8 qtype, __u64 time)
+{
+ struct qmt_thread_info *qti = qmt_info(env);
+ union lquota_id *id = &qti->qti_id_bis;
+ struct lquota_entry *lqe;
+ struct thandle *th = NULL;
+ int rc;
+ ENTRY;
+
+ /* Global grace time is stored in quota settings of ID 0. */
+ id->qid_uid = 0;
+
+ /* look-up quota entry storing the global grace time */
+ lqe = qmt_pool_lqe_lookup(env, qmt, pool_id, restype, qtype, id);
+ if (IS_ERR(lqe))
+ RETURN(PTR_ERR(lqe));
+
+ /* allocate & start transaction with enough credits to update grace
+ * time in the global index file */
+ th = qmt_trans_start(env, lqe, &qti->qti_restore);
+ if (IS_ERR(th))
+ GOTO(out_nolock, rc = PTR_ERR(th));
+
+ /* write lock quota entry storing the grace time */
+ lqe_write_lock(lqe);
+ if (lqe->lqe_gracetime == time)
+ /* grace time is the same */
+ GOTO(out, rc = 0);
+
+ LQUOTA_DEBUG(lqe, "setinfo time:"LPU64, time);
+
+ /* set new grace time */
+ lqe->lqe_gracetime = time;
+ /* always set enforced bit for ID 0 to make sure it does not go away */
+ lqe->lqe_enforced = true;
+
+ /* write new grace time to disk, no need for version bump */
+ rc = qmt_glb_write(env, th, lqe, 0, NULL);
+ if (rc) {
+ /* restore initial grace time */
+ qmt_restore(lqe, &qti->qti_restore);
+ GOTO(out, rc);
+ }
+ EXIT;
+out:
+ lqe_write_unlock(lqe);
+out_nolock:
+ lqe_putref(lqe);
+ if (th != NULL && !IS_ERR(th))
+ dt_trans_stop(env, qmt->qmt_child, th);
+ return rc;
+}
+
+/*
+ * Retrieve quota settings for a given identifier.
+ *
+ * \param env - is the environment passed by the caller
+ * \param qmt - is the quota master target
+ * \param pool_id - is the 16-bit pool identifier
+ * \param restype - is the pool type, either block (i.e. LQUOTA_RES_DT) or inode
+ * (i.e. LQUOTA_RES_MD)
+ * \param qtype - is the quota type
+ * \param id - is the quota indentifier for which we want to acces quota
+ * settings.
+ * \param hard - is the output variable where to copy the hard limit
+ * \param soft - is the output variable where to copy the soft limit
+ * \param time - is the output variable where to copy the grace time
+ */
+static int qmt_getquota(const struct lu_env *env, struct qmt_device *qmt,
+ __u16 pool_id, __u8 restype, __u8 qtype,
+ union lquota_id *id, __u64 *hard, __u64 *soft,
+ __u64 *time)
+{
+ struct lquota_entry *lqe;
+ ENTRY;
+
+ /* look-up lqe structure containing quota settings */
+ lqe = qmt_pool_lqe_lookup(env, qmt, pool_id, restype, qtype, id);
+ if (IS_ERR(lqe))
+ RETURN(PTR_ERR(lqe));
+
+ /* copy quota settings */
+ lqe_read_lock(lqe);
+ LQUOTA_DEBUG(lqe, "getquota");
+ *hard = lqe->lqe_hardlimit;
+ *soft = lqe->lqe_softlimit;
+ *time = lqe->lqe_gracetime;
+ lqe_read_unlock(lqe);
+
+ lqe_putref(lqe);
+ RETURN(0);
+}
+
+/*
+ * Update quota settings for a given identifier.
+ *
+ * \param env - is the environment passed by the caller
+ * \param qmt - is the quota master target
+ * \param pool_id - is the 16-bit pool identifier
+ * \param restype - is the pool type, either block (i.e. LQUOTA_RES_DT) or inode
+ * (i.e. LQUOTA_RES_MD)
+ * \param qtype - is the quota type
+ * \param id - is the quota indentifier for which we want to modify quota
+ * settings.
+ * \param hard - is the new hard limit
+ * \param soft - is the new soft limit
+ * \param time - is the new grace time
+ * \param valid - is the list of settings to change
+ */
+static int qmt_setquota(const struct lu_env *env, struct qmt_device *qmt,
+ __u16 pool_id, __u8 restype, __u8 qtype,
+ union lquota_id *id, __u64 hard, __u64 soft, __u64 time,
+ __u32 valid)
+{
+ struct qmt_thread_info *qti = qmt_info(env);
+ struct lquota_entry *lqe;
+ struct thandle *th = NULL;
+ __u64 grace, ver;
+ bool dirtied = false, bump_version = false;
+ int rc = 0;
+ ENTRY;
+
+ /* fetch global grace time */
+ rc = qmt_getinfo(env, qmt, pool_id, restype, qtype, &grace);
+ if (rc)
+ RETURN(rc);
+
+ /* look-up quota entry associated with this ID */
+ lqe = qmt_pool_lqe_lookup(env, qmt, pool_id, restype, qtype, id);
+ if (IS_ERR(lqe))
+ RETURN(PTR_ERR(lqe));
+
+ /* allocate & start transaction with enough credits to update quota
+ * settings in the global index file */
+ th = qmt_trans_start(env, lqe, &qti->qti_restore);
+ if (IS_ERR(th))
+ GOTO(out_nolock, rc = PTR_ERR(th));
+
+ lqe_write_lock(lqe);
+ LQUOTA_DEBUG(lqe, "setquota valid:%x hard:"LPU64" soft:"LPU64
+ " time:"LPU64, valid, hard, soft, time);
+
+ if ((valid & QIF_TIMES) != 0 && lqe->lqe_gracetime != time) {
+ /* change time settings */
+ lqe->lqe_gracetime = time;
+ dirtied = true;
+ }
+
+ if ((valid & QIF_LIMITS) != 0 &&
+ (lqe->lqe_hardlimit != hard || lqe->lqe_softlimit != soft)) {
+ bool enforced = lqe->lqe_enforced;
+
+ rc = qmt_validate_limits(lqe, hard, soft);
+ if (rc)
+ GOTO(out, rc);
+
+ /* change quota limits */
+ lqe->lqe_hardlimit = hard;
+ lqe->lqe_softlimit = soft;
+
+ /* clear grace time */
+ if (lqe->lqe_softlimit == 0 ||
+ lqe->lqe_granted <= lqe->lqe_softlimit)
+ /* no soft limit or below soft limit, let's clear grace
+ * time */
+ lqe->lqe_gracetime = 0;
+ else if ((valid & QIF_TIMES) == 0)
+ /* set grace only if user hasn't provided his own */
+ lqe->lqe_gracetime = cfs_time_current_sec() + grace;
+
+ /* change enforced status based on new parameters */
+ if (lqe->lqe_hardlimit == 0 && lqe->lqe_softlimit == 0)
+ lqe->lqe_enforced = false;
+ else
+ lqe->lqe_enforced = true;
+
+ if ((enforced && !lqe->lqe_enforced) ||
+ (!enforced && lqe->lqe_enforced))
+ /* if enforced status has changed, we need to inform
+ * slave, therefore we need to bump the version */
+ bump_version = true;
+
+ dirtied = true;
+ }
+
+ if (dirtied) {
+ /* write new quota settings to disk */
+ rc = qmt_glb_write(env, th, lqe,
+ bump_version ? LQUOTA_BUMP_VER : 0, &ver);
+ if (rc) {
+ /* restore initial quota settings */
+ qmt_restore(lqe, &qti->qti_restore);
+ GOTO(out, rc);
+ }
+ }
+ EXIT;
+out:
+ lqe_write_unlock(lqe);
+out_nolock:
+ lqe_putref(lqe);
+
+ if (th != NULL && !IS_ERR(th))
+ dt_trans_stop(env, qmt->qmt_child, th);
+
+ return rc;
+}
+
+/*
* Handle quotactl request.
*
* \param env - is the environment passed by the caller
static int qmt_quotactl(const struct lu_env *env, struct lu_device *ld,
struct obd_quotactl *oqctl)
{
+ struct qmt_thread_info *qti = qmt_info(env);
+ union lquota_id *id = &qti->qti_id;
struct qmt_device *qmt = lu2qmt_dev(ld);
+ struct obd_dqblk *dqb = &oqctl->qc_dqblk;
int rc = 0;
ENTRY;
switch (oqctl->qc_cmd) {
- case Q_GETINFO:
- case Q_SETINFO:
- case Q_SETQUOTA:
- /* XXX: not implemented yet. */
- CERROR("quotactl operation %d not implemented yet\n",
- oqctl->qc_cmd);
- RETURN(-EOPNOTSUPP);
-
- case Q_GETQUOTA:
- /* XXX: return no limit for now, just for testing purpose */
- memset(&oqctl->qc_dqblk, 0, sizeof(struct obd_dqblk));
- oqctl->qc_dqblk.dqb_valid = QIF_LIMITS;
- rc = 0;
+ case Q_GETINFO: /* read grace times */
+ /* read inode grace time */
+ rc = qmt_getinfo(env, qmt, 0, LQUOTA_RES_MD, oqctl->qc_type,
+ &oqctl->qc_dqinfo.dqi_igrace);
+ if (rc)
+ break;
+
+ /* read block grace time */
+ rc = qmt_getinfo(env, qmt, 0, LQUOTA_RES_DT, oqctl->qc_type,
+ &oqctl->qc_dqinfo.dqi_bgrace);
+ break;
+
+ case Q_SETINFO: /* modify grace times */
+ /* setinfo should be using dqi->dqi_valid, but lfs incorrectly
+ * sets the valid flags in dqb->dqb_valid instead, try to live
+ * with that ... */
+ if ((dqb->dqb_valid & QIF_ITIME) != 0) {
+ /* set inode grace time */
+ rc = qmt_setinfo(env, qmt, 0, LQUOTA_RES_MD,
+ oqctl->qc_type,
+ oqctl->qc_dqinfo.dqi_igrace);
+ if (rc)
+ break;
+ }
+
+ if ((dqb->dqb_valid & QIF_BTIME) != 0)
+ /* set block grace time */
+ rc = qmt_setinfo(env, qmt, 0, LQUOTA_RES_DT,
+ oqctl->qc_type,
+ oqctl->qc_dqinfo.dqi_bgrace);
+ break;
+
+ case Q_GETQUOTA: /* consult quota limit */
+ /* There is no quota limit for root user & group */
+ if (oqctl->qc_id == 0) {
+ memset(dqb, 0, sizeof(*dqb));
+ dqb->dqb_valid = QIF_LIMITS | QIF_TIMES;
+ break;
+ }
+ /* extract quota ID from quotactl request */
+ id->qid_uid = oqctl->qc_id;
+
+ /* look-up inode quota settings */
+ rc = qmt_getquota(env, qmt, 0, LQUOTA_RES_MD, oqctl->qc_type,
+ id, &dqb->dqb_ihardlimit,
+ &dqb->dqb_isoftlimit, &dqb->dqb_itime);
+ if (rc)
+ break;
+
+ dqb->dqb_valid |= QIF_ILIMITS | QIF_ITIME;
+ /* master isn't aware of actual inode usage */
+ dqb->dqb_curinodes = 0;
+
+ /* look-up block quota settings */
+ rc = qmt_getquota(env, qmt, 0, LQUOTA_RES_DT, oqctl->qc_type,
+ id, &dqb->dqb_bhardlimit,
+ &dqb->dqb_bsoftlimit, &dqb->dqb_btime);
+ if (rc)
+ break;
+
+ dqb->dqb_valid |= QIF_BLIMITS | QIF_BTIME;
+ /* master doesn't know the actual block usage */
+ dqb->dqb_curspace = 0;
+ break;
+
+ case Q_SETQUOTA: /* change quota limits */
+ if (oqctl->qc_id == 0)
+ /* can't enforce a quota limit for root user & group */
+ RETURN(-EPERM);
+ /* extract quota ID from quotactl request */
+ id->qid_uid = oqctl->qc_id;
+
+ if ((dqb->dqb_valid & QIF_IFLAGS) != 0) {
+ /* update inode quota settings */
+ rc = qmt_setquota(env, qmt, 0, LQUOTA_RES_MD,
+ oqctl->qc_type, id,
+ dqb->dqb_ihardlimit,
+ dqb->dqb_isoftlimit, dqb->dqb_itime,
+ dqb->dqb_valid & QIF_IFLAGS);
+ if (rc)
+ break;
+ }
+
+ if ((dqb->dqb_valid & QIF_BFLAGS) != 0)
+ /* update block quota settings */
+ rc = qmt_setquota(env, qmt, 0, LQUOTA_RES_DT,
+ oqctl->qc_type, id,
+ dqb->dqb_bhardlimit,
+ dqb->dqb_bsoftlimit, dqb->dqb_btime,
+ dqb->dqb_valid & QIF_BFLAGS);
break;
+ case Q_QUOTAON:
+ case Q_QUOTAOFF: /* quota is always turned on on the master */
+ RETURN(0);
+
+ case LUSTRE_Q_INVALIDATE: /* not supported any more */
+ RETURN(-ENOTSUPP);
+
default:
CERROR("%s: unsupported quotactl command: %d\n",
qmt->qmt_svname, oqctl->qc_cmd);
- RETURN(-EFAULT);
+ RETURN(-ENOTSUPP);
}
RETURN(rc);
if (repbody == NULL)
RETURN(err_serious(-EFAULT));
+ /* XXX: to be implemented */
+
RETURN(0);
}
* index files */
struct obd_export *qmt_child_exp;
struct dt_device *qmt_child;
+
+ /* pointer to ldlm namespace to be used for quota locks */
+ struct ldlm_namespace *qmt_ns;
+
+ /* Hash table containing a qmt_pool_info structure for each pool
+ * this quota master is in charge of. We only have 2 pools in this
+ * hash for the time being:
+ * - one for quota management on the default metadata pool
+ * - one for quota managment on the default data pool
+ *
+ * Once we support quota on non-default pools, then more pools will
+ * be added to this hash table and pool master setup would have to be
+ * handled via configuration logs */
+ cfs_hash_t *qmt_pool_hash;
+
+ /* List of pools managed by this master target */
+ cfs_list_t qmt_pool_list;
+
+ /* procfs root directory for this qmt */
+ cfs_proc_dir_entry_t *qmt_proc;
+
+ unsigned long qmt_stopping:1; /* qmt is stopping */
+
+};
+
+/*
+ * Per-pool quota information.
+ * The qmt creates one such structure for each pool
+ * with quota enforced. All the structures are kept in a hash which is used to
+ * determine whether or not quota is enforced for a given pool.
+ * We currently only support the default data pool and default metadata pool
+ * with the pool_id 0.
+ */
+struct qmt_pool_info {
+ /* link to qmt's pool hash */
+ cfs_hlist_node_t qpi_hash;
+
+ /* chained list of all pools managed by the same qmt */
+ cfs_list_t qpi_linkage;
+
+ /* Pool key composed of pool_id | (pool_type << 16)
+ * Only pool ID 0 is supported for now and the pool type is either
+ * QUOTA_RES_MD or QUOTA_RES_DT.
+ * immutable after creation. */
+ __u32 qpi_key;
+
+ /* track users of this pool instance */
+ cfs_atomic_t qpi_ref;
+
+ /* back pointer to master target
+ * immutable after creation. */
+ struct qmt_device *qpi_qmt;
+
+ /* pointer to dt object associated with global indexes for both user
+ * and group quota */
+ struct dt_object *qpi_glb_obj[MAXQUOTAS];
+
+ /* A pool supports two different quota types: user and group quota.
+ * Each quota type has its own global index and lquota_entry hash table.
+ */
+ struct lquota_site *qpi_site[MAXQUOTAS];
+
+ /* number of slaves registered for each quota types */
+ int qpi_slv_nr[MAXQUOTAS];
+
+ /* procfs root directory for this pool */
+ cfs_proc_dir_entry_t *qpi_proc;
+
+ /* pool directory where all indexes related to this pool instance are
+ * stored */
+ struct dt_object *qpi_root;
+
+ /* Global quota parameters which apply to all quota type */
+ /* the least value of qunit */
+ unsigned long qpi_least_qunit;
+};
+
+/*
+ * Helper routines and prototypes
+ */
+
+/* helper routine to find qmt_pool_info associated a lquota_entry */
+static inline struct qmt_pool_info *lqe2qpi(struct lquota_entry *lqe)
+{
+ LASSERT(lqe_is_master(lqe));
+ return (struct qmt_pool_info *)lqe->lqe_site->lqs_parent;
+}
+
+/* return true if someone holds either a read or write lock on the lqe */
+static inline bool lqe_is_locked(struct lquota_entry *lqe)
+{
+ LASSERT(lqe_is_master(lqe));
+ if (cfs_down_write_trylock(&lqe->lqe_sem) == 0)
+ return true;
+ lqe_write_unlock(lqe);
+ return false;
+}
+
+/* value to be restored if someone wrong happens during lqe writeback */
+struct qmt_lqe_restore {
+ __u64 qlr_hardlimit;
+ __u64 qlr_softlimit;
+ __u64 qlr_gracetime;
+ __u64 qlr_granted;
+ __u64 qlr_qunit;
};
/* Common data shared by qmt handlers */
struct qmt_thread_info {
union lquota_rec qti_rec;
union lquota_id qti_id;
+ union lquota_id qti_id_bis;
+ char qti_buf[MTI_NAME_MAXLEN];
+ struct lu_fid qti_fid;
+ struct ldlm_res_id qti_resid;
+ union ldlm_gl_desc qti_gl_desc;
+ struct quota_body qti_body;
+ struct quota_body qti_repbody;
+ struct qmt_lqe_restore qti_restore;
};
extern struct lu_context_key qmt_thread_key;
return &qmt->qmt_dt_dev.dd_lu_dev;
}
+#define LQE_ROOT(lqe) (lqe2qpi(lqe)->qpi_root)
+#define LQE_GLB_OBJ(lqe) (lqe2qpi(lqe)->qpi_glb_obj[lqe->lqe_site->lqs_qtype])
+
+static inline void qmt_restore(struct lquota_entry *lqe,
+ struct qmt_lqe_restore *restore)
+{
+ lqe->lqe_hardlimit = restore->qlr_hardlimit;
+ lqe->lqe_softlimit = restore->qlr_softlimit;
+ lqe->lqe_gracetime = restore->qlr_gracetime;
+ lqe->lqe_granted = restore->qlr_granted;
+ lqe->lqe_qunit = restore->qlr_qunit;
+}
+
+/* qmt_pool.c */
+void qmt_pool_fini(const struct lu_env *, struct qmt_device *);
+int qmt_pool_init(const struct lu_env *, struct qmt_device *);
+int qmt_pool_prepare(const struct lu_env *, struct qmt_device *,
+ struct dt_object *);
+int qmt_pool_new_conn(const struct lu_env *, struct qmt_device *,
+ struct lu_fid *, struct lu_fid *, __u64 *,
+ struct obd_uuid *);
+struct lquota_entry *qmt_pool_lqe_lookup(const struct lu_env *,
+ struct qmt_device *, int, int, int,
+ union lquota_id *);
+/* qmt_entry.c */
+extern struct lquota_entry_operations qmt_lqe_ops;
+struct thandle *qmt_trans_start_with_slv(const struct lu_env *,
+ struct lquota_entry *,
+ struct dt_object *,
+ struct qmt_lqe_restore *);
+struct thandle *qmt_trans_start(const struct lu_env *, struct lquota_entry *,
+ struct qmt_lqe_restore *);
+int qmt_glb_write(const struct lu_env *, struct thandle *,
+ struct lquota_entry *, __u32, __u64 *);
+int qmt_slv_write(const struct lu_env *, struct thandle *,
+ struct lquota_entry *, struct dt_object *, __u32, __u64 *,
+ __u64);
+int qmt_slv_read(const struct lu_env *, struct lquota_entry *,
+ struct dt_object *, __u64 *);
+int qmt_validate_limits(struct lquota_entry *, __u64, __u64);
+
/* qmt_lock.c */
int qmt_intent_policy(const struct lu_env *, struct lu_device *,
struct ptlrpc_request *, struct ldlm_lock **, int);
--- /dev/null
+/*
+ * GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 021110-1307, USA
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright (c) 2012 Intel, Inc.
+ * Use is subject to license terms.
+ *
+ * Author: Johann Lombardi <johann.lombardi@intel.com>
+ * Author: Niu Yawei <yawei.niu@intel.com>
+ */
+
+/*
+ * A Quota Master Target has a hash table where it stores qmt_pool_info
+ * structures. There is one such structure for each pool managed by the QMT.
+ *
+ * Each pool can have different quota types enforced (typically user & group
+ * quota). A pool is in charge of managing lquota_entry structures for each
+ * quota type. This is done by creating one lquota_entry site per quota
+ * type. A site stores entries in a hash table and read quota settings from disk
+ * when a given ID isn't present in the hash.
+ *
+ * The pool API exported here is the following:
+ * - qmt_pool_init(): initializes the general QMT structures used to manage
+ * pools.
+ * - qmt_pool_fini(): frees the structures allocated by qmt_pool_fini().
+ * - qmt_pool_prepare(): sets up the on-disk indexes associated with each pool.
+ * - qmt_pool_new_conn(): is used to create a new slave index file.
+ * - qmt_pool_lqe_lookup(): returns an up-to-date lquota entry associated with
+ * a given ID.
+ */
+
+#ifndef EXPORT_SYMTAB
+# define EXPORT_SYMTAB
+#endif
+
+#define DEBUG_SUBSYSTEM S_LQUOTA
+
+#include <obd_class.h>
+#include <lprocfs_status.h>
+#include "qmt_internal.h"
+
+static void qmt_pool_free(const struct lu_env *, struct qmt_pool_info *);
+
+/*
+ * Static helper functions not used outside the scope of this file
+ */
+
+/*
+ * Reference counter management for qmt_pool_info structures
+ */
+static inline void qpi_getref(struct qmt_pool_info *pool)
+{
+ cfs_atomic_inc(&pool->qpi_ref);
+}
+
+static inline void qpi_putref(const struct lu_env *env,
+ struct qmt_pool_info *pool)
+{
+ LASSERT(atomic_read(&pool->qpi_ref) > 0);
+ if (cfs_atomic_dec_and_test(&pool->qpi_ref))
+ qmt_pool_free(env, pool);
+}
+
+static inline void qpi_putref_locked(struct qmt_pool_info *pool)
+{
+ LASSERT(cfs_atomic_read(&pool->qpi_ref) > 1);
+ cfs_atomic_dec(&pool->qpi_ref);
+}
+
+/*
+ * Hash functions for qmt_pool_info management
+ */
+
+static unsigned qpi_hash_hash(cfs_hash_t *hs, const void *key, unsigned mask)
+{
+ return cfs_hash_u32_hash(*((__u32 *)key), mask);
+}
+
+static void *qpi_hash_key(cfs_hlist_node_t *hnode)
+{
+ struct qmt_pool_info *pool;
+ pool = cfs_hlist_entry(hnode, struct qmt_pool_info, qpi_hash);
+ return &pool->qpi_key;
+}
+
+static int qpi_hash_keycmp(const void *key, cfs_hlist_node_t *hnode)
+{
+ struct qmt_pool_info *pool;
+ pool = cfs_hlist_entry(hnode, struct qmt_pool_info, qpi_hash);
+ return pool->qpi_key == *((__u32 *)key);
+}
+
+static void *qpi_hash_object(cfs_hlist_node_t *hnode)
+{
+ return cfs_hlist_entry(hnode, struct qmt_pool_info, qpi_hash);
+}
+
+static void qpi_hash_get(cfs_hash_t *hs, cfs_hlist_node_t *hnode)
+{
+ struct qmt_pool_info *pool;
+ pool = cfs_hlist_entry(hnode, struct qmt_pool_info, qpi_hash);
+ qpi_getref(pool);
+}
+
+static void qpi_hash_put_locked(cfs_hash_t *hs, cfs_hlist_node_t *hnode)
+{
+ struct qmt_pool_info *pool;
+ pool = cfs_hlist_entry(hnode, struct qmt_pool_info, qpi_hash);
+ qpi_putref_locked(pool);
+}
+
+static void qpi_hash_exit(cfs_hash_t *hs, cfs_hlist_node_t *hnode)
+{
+ CERROR("Should not have any item left!\n");
+}
+
+/* vector of hash operations */
+static cfs_hash_ops_t qpi_hash_ops = {
+ .hs_hash = qpi_hash_hash,
+ .hs_key = qpi_hash_key,
+ .hs_keycmp = qpi_hash_keycmp,
+ .hs_object = qpi_hash_object,
+ .hs_get = qpi_hash_get,
+ .hs_put_locked = qpi_hash_put_locked,
+ .hs_exit = qpi_hash_exit
+};
+
+/* some procfs helpers */
+static int lprocfs_qpi_rd_state(char *page, char **start, off_t off,
+ int count, int *eof, void *data)
+{
+ struct qmt_pool_info *pool = (struct qmt_pool_info *)data;
+ int type, i = 0;
+
+ LASSERT(pool != NULL);
+
+ i = snprintf(page, count,
+ "pool:\n"
+ " id: %u\n"
+ " type: %s\n"
+ " ref: %d\n"
+ " least qunit: %lu\n",
+ pool->qpi_key & 0x0000ffff,
+ RES_NAME(pool->qpi_key >> 16),
+ cfs_atomic_read(&pool->qpi_ref),
+ pool->qpi_least_qunit);
+
+
+ for (type = 0; type < MAXQUOTAS; type++)
+ i += snprintf(page + i, count - i,
+ " %s:\n"
+ " #slv: %d\n"
+ " #lqe: %d\n",
+ QTYPE_NAME(type),
+ pool->qpi_slv_nr[type],
+ cfs_atomic_read(&pool->qpi_site[type]->lqs_hash->hs_count));
+
+ return i;
+}
+
+static struct lprocfs_vars lprocfs_quota_qpi_vars[] = {
+ { "info", lprocfs_qpi_rd_state, 0, 0},
+ { NULL }
+};
+
+/*
+ * Allocate a new qmt_pool_info structure and add it to the pool hash table
+ * of the qmt.
+ *
+ * \param env - is the environment passed by the caller
+ * \param qmt - is the quota master target
+ * \param pool_id - is the 16-bit pool identifier of the new pool to add
+ * \param pool_type - is the resource type of this pool instance, either
+ * LQUOTA_RES_MD or LQUOTA_RES_DT.
+ *
+ * \retval - 0 on success, appropriate error on failure
+ */
+static int qmt_pool_alloc(const struct lu_env *env, struct qmt_device *qmt,
+ int pool_id, int pool_type)
+{
+ struct qmt_thread_info *qti = qmt_info(env);
+ struct qmt_pool_info *pool;
+ int rc = 0;
+ ENTRY;
+
+ OBD_ALLOC_PTR(pool);
+ if (pool == NULL)
+ RETURN(-ENOMEM);
+ CFS_INIT_LIST_HEAD(&pool->qpi_linkage);
+
+ /* assign key used by hash functions */
+ pool->qpi_key = pool_id + (pool_type << 16);
+
+ /* initialize refcount to 1, hash table will then grab an additional
+ * reference */
+ atomic_set(&pool->qpi_ref, 1);
+
+ /* set up least qunit size to use for this pool */
+ pool->qpi_least_qunit = LQUOTA_LEAST_QUNIT(pool_type);
+
+ /* create pool proc directory */
+ sprintf(qti->qti_buf, "%s-0x%x", RES_NAME(pool_type), pool_id);
+ pool->qpi_proc = lprocfs_register(qti->qti_buf, qmt->qmt_proc,
+ lprocfs_quota_qpi_vars, pool);
+ if (IS_ERR(pool->qpi_proc)) {
+ rc = PTR_ERR(pool->qpi_proc);
+ CERROR("%s: failed to create proc entry for pool %s (%d)\n",
+ qmt->qmt_svname, qti->qti_buf, rc);
+ pool->qpi_proc = NULL;
+ GOTO(out, rc);
+ }
+
+ /* grab reference on master target that this pool belongs to */
+ lu_device_get(qmt2lu_dev(qmt));
+ lu_ref_add(&qmt2lu_dev(qmt)->ld_reference, "pool", pool);
+ pool->qpi_qmt = qmt;
+
+ /* add to qmt hash table */
+ rc = cfs_hash_add_unique(qmt->qmt_pool_hash, &pool->qpi_key,
+ &pool->qpi_hash);
+ if (rc) {
+ CERROR("%s: failed to add pool %s to qmt hash (%d)\n",
+ qmt->qmt_svname, qti->qti_buf, rc);
+ GOTO(out, rc);
+ }
+
+ /* add to qmt pool list */
+ cfs_list_add_tail(&pool->qpi_linkage, &qmt->qmt_pool_list);
+ EXIT;
+out:
+ if (rc)
+ /* this frees the pool structure since refcount is equal to 1 */
+ qpi_putref(env, pool);
+ return rc;
+}
+
+/*
+ * Delete a qmt_pool_info instance and all structures associated.
+ *
+ * \param env - is the environment passed by the caller
+ * \param pool - is the qmt_pool_info structure to free
+ */
+static void qmt_pool_free(const struct lu_env *env, struct qmt_pool_info *pool)
+{
+ int qtype;
+ ENTRY;
+
+ /* release proc entry */
+ if (pool->qpi_proc) {
+ lprocfs_remove(&pool->qpi_proc);
+ pool->qpi_proc = NULL;
+ }
+
+ /* release per-quota type site used to manage quota entries as well as
+ * references to global index files */
+ for (qtype = 0; qtype < MAXQUOTAS; qtype++) {
+ /* release site */
+ if (pool->qpi_site[qtype] != NULL &&
+ !IS_ERR(pool->qpi_site[qtype]))
+ lquota_site_free(env, pool->qpi_site[qtype]);
+ /* release reference to global index */
+ if (pool->qpi_glb_obj[qtype] != NULL &&
+ !IS_ERR(pool->qpi_glb_obj[qtype]))
+ lu_object_put(env, &pool->qpi_glb_obj[qtype]->do_lu);
+ }
+
+ /* release reference on pool directory */
+ if (pool->qpi_root != NULL && !IS_ERR(pool->qpi_root))
+ lu_object_put(env, &pool->qpi_root->do_lu);
+
+ /* release reference on the master target */
+ if (pool->qpi_qmt != NULL) {
+ struct lu_device *ld = qmt2lu_dev(pool->qpi_qmt);
+
+ lu_device_put(ld);
+ lu_ref_del(&ld->ld_reference, "pool", pool);
+ pool->qpi_qmt = NULL;
+ }
+
+ LASSERT(cfs_list_empty(&pool->qpi_linkage));
+ OBD_FREE_PTR(pool);
+}
+
+/*
+ * Look-up a pool in the hash table based on the pool ID and type.
+ *
+ * \param env - is the environment passed by the caller
+ * \param qmt - is the quota master target
+ * \param pool_id - is the 16-bit identifier of the pool to look up
+ * \param pool_type - is the type of this pool, either LQUOTA_RES_MD or
+ * LQUOTA_RES_DT.
+ */
+static struct qmt_pool_info *qmt_pool_lookup(const struct lu_env *env,
+ struct qmt_device *qmt,
+ int pool_id, int pool_type)
+{
+ struct qmt_pool_info *pool;
+ __u32 key;
+ ENTRY;
+
+ LASSERT(qmt->qmt_pool_hash != NULL);
+
+ /* look-up pool in hash table */
+ key = pool_id + (pool_type << 16);
+ pool = cfs_hash_lookup(qmt->qmt_pool_hash, (void *)&key);
+ if (pool == NULL) {
+ /* this qmt isn't managing this pool! */
+ CERROR("%s: looking up quota entry for a pool (0x%x/%d) which "
+ "isn't managed by this quota master target\n",
+ qmt->qmt_svname, pool_id, pool_type);
+ RETURN(ERR_PTR(-ENOENT));
+ }
+ RETURN(pool);
+}
+
+/*
+ * Functions implementing the pool API, used by the qmt handlers
+ */
+
+/*
+ * Destroy all pools which are still in the hash table and free the pool
+ * hash table.
+ *
+ * \param env - is the environment passed by the caller
+ * \param qmt - is the quota master target
+ *
+ */
+void qmt_pool_fini(const struct lu_env *env, struct qmt_device *qmt)
+{
+ struct qmt_pool_info *pool;
+ cfs_list_t *pos, *n;
+ ENTRY;
+
+ if (qmt->qmt_pool_hash == NULL)
+ RETURN_EXIT;
+
+ /* parse list of pool and destroy each element */
+ cfs_list_for_each_safe(pos, n, &qmt->qmt_pool_list) {
+ pool = cfs_list_entry(pos, struct qmt_pool_info,
+ qpi_linkage);
+ /* remove from hash */
+ cfs_hash_del(qmt->qmt_pool_hash, &pool->qpi_key,
+ &pool->qpi_hash);
+
+ /* remove from list */
+ cfs_list_del_init(&pool->qpi_linkage);
+
+ /* release extra reference taken in qmt_pool_alloc */
+ qpi_putref(env, pool);
+ }
+ LASSERT(cfs_list_empty(&qmt->qmt_pool_list));
+
+ cfs_hash_putref(qmt->qmt_pool_hash);
+ qmt->qmt_pool_hash = NULL;
+ EXIT;
+}
+
+/*
+ * Initialize pool configure for the quota master target. For now, we only
+ * support the default data (i.e. all OSTs) and metadata (i.e. all the MDTs)
+ * pool which are instantiated in this function.
+ *
+ * \param env - is the environment passed by the caller
+ * \param qmt - is the quota master target for which we have to initializa the
+ * pool configuration
+ *
+ * \retval - 0 on success, appropriate error on failure
+ */
+int qmt_pool_init(const struct lu_env *env, struct qmt_device *qmt)
+{
+ int res, rc = 0;
+ ENTRY;
+
+ /* initialize pool hash table */
+ qmt->qmt_pool_hash = cfs_hash_create("POOL_HASH",
+ HASH_POOLS_CUR_BITS,
+ HASH_POOLS_MAX_BITS,
+ HASH_POOLS_BKT_BITS, 0,
+ CFS_HASH_MIN_THETA,
+ CFS_HASH_MAX_THETA,
+ &qpi_hash_ops,
+ CFS_HASH_DEFAULT);
+ if (qmt->qmt_pool_hash == NULL) {
+ CERROR("%s: failed to create pool hash table\n",
+ qmt->qmt_svname);
+ RETURN(-ENOMEM);
+ }
+
+ /* initialize pool list */
+ CFS_INIT_LIST_HEAD(&qmt->qmt_pool_list);
+
+ /* Instantiate pool master for the default data and metadata pool (both
+ * have pool ID equals to 0).
+ * This code will have to be revisited once we support quota on
+ * non-default pools */
+ for (res = LQUOTA_FIRST_RES; res < LQUOTA_LAST_RES; res++) {
+ rc = qmt_pool_alloc(env, qmt, 0, res);
+ if (rc)
+ break;
+ }
+
+ if (rc)
+ qmt_pool_fini(env, qmt);
+
+ RETURN(rc);
+}
+
+static int qmt_slv_cnt(const struct lu_env *env, struct lu_fid *glb_fid,
+ char *slv_name, struct lu_fid *slv_fid, void *arg)
+{
+ int *nr = arg;
+
+ /* one more slave */
+ (*nr)++;
+
+ return 0;
+}
+
+/*
+ * Set up on-disk index files associated with each pool.
+ *
+ * \param env - is the environment passed by the caller
+ * \param qmt - is the quota master target for which we have to initializa the
+ * pool configuration
+ * \param qmt_root - is the on-disk directory created for the QMT.
+ *
+ * \retval - 0 on success, appropriate error on failure
+ */
+int qmt_pool_prepare(const struct lu_env *env, struct qmt_device *qmt,
+ struct dt_object *qmt_root)
+{
+ struct qmt_thread_info *qti = qmt_info(env);
+ struct qmt_pool_info *pool;
+ struct dt_device *dev = NULL;
+ cfs_list_t *pos;
+ int rc = 0, qtype;
+ ENTRY;
+
+ LASSERT(qmt->qmt_pool_hash != NULL);
+
+ /* iterate over each pool in the hash and allocate a quota site for each
+ * one. This involves creating a global index file on disk */
+ cfs_list_for_each(pos, &qmt->qmt_pool_list) {
+ struct dt_object *obj;
+ int pool_type, pool_id;
+
+ pool = cfs_list_entry(pos, struct qmt_pool_info,
+ qpi_linkage);
+
+ pool_id = pool->qpi_key & 0x0000ffff;
+ pool_type = pool->qpi_key >> 16;
+ if (dev == NULL)
+ dev = pool->qpi_qmt->qmt_child;
+
+ /* allocate directory for this pool */
+ sprintf(qti->qti_buf, "%s-0x%x", RES_NAME(pool_type), pool_id);
+ obj = lquota_disk_dir_find_create(env, qmt->qmt_child, qmt_root,
+ qti->qti_buf);
+ if (IS_ERR(obj))
+ RETURN(PTR_ERR(obj));
+ pool->qpi_root = obj;
+
+ for (qtype = 0; qtype < MAXQUOTAS; qtype++) {
+ /* Generating FID of global index in charge of storing
+ * settings for this quota type */
+ lquota_generate_fid(&qti->qti_fid, pool_id, pool_type,
+ qtype);
+
+ /* open/create the global index file for this quota
+ * type */
+ obj = lquota_disk_glb_find_create(env, dev,
+ pool->qpi_root,
+ &qti->qti_fid, false);
+ if (IS_ERR(obj)) {
+ rc = PTR_ERR(obj);
+ CERROR("%s: failed to create glb index copy for"
+ " %s type (%d)\n", qmt->qmt_svname,
+ QTYPE_NAME(qtype), rc);
+ RETURN(rc);
+ }
+
+ pool->qpi_glb_obj[qtype] = obj;
+
+ /* create quota entry site for this quota type */
+ pool->qpi_site[qtype] = lquota_site_alloc(env, pool,
+ true, qtype,
+ &qmt_lqe_ops);
+ if (IS_ERR(pool->qpi_site[qtype])) {
+ rc = PTR_ERR(pool->qpi_site[qtype]);
+ CERROR("%s: failed to create site for %s type "
+ "(%d)\n", qmt->qmt_svname,
+ QTYPE_NAME(qtype), rc);
+ RETURN(rc);
+ }
+
+ /* count number of slaves which already connected to
+ * the master in the past */
+ pool->qpi_slv_nr[qtype] = 0;
+ rc = lquota_disk_for_each_slv(env, pool->qpi_root,
+ &qti->qti_fid,
+ qmt_slv_cnt,
+ &pool->qpi_slv_nr[qtype]);
+ if (rc) {
+ CERROR("%s: failed to scan & count slave "
+ "indexes for %s type (%d)\n",
+ qmt->qmt_svname, QTYPE_NAME(qtype), rc);
+ RETURN(rc);
+ }
+#ifdef LPROCFS
+ /* add procfs file to dump the global index, mostly for
+ * debugging purpose */
+ sprintf(qti->qti_buf, "glb-%s", QTYPE_NAME(qtype));
+ rc = lprocfs_seq_create(pool->qpi_proc, qti->qti_buf,
+ 0444, &lprocfs_quota_seq_fops,
+ obj);
+ if (rc)
+ CWARN("%s: Error adding procfs file for global"
+ "quota index "DFID", rc:%d\n",
+ qmt->qmt_svname, PFID(&qti->qti_fid), rc);
+#endif
+ }
+ }
+
+ RETURN(0);
+}
+
+/*
+ * Handle new slave connection. Called when a slave enqueues the global quota
+ * lock at the beginning of the reintegration procedure.
+ *
+ * \param env - is the environment passed by the caller
+ * \parap qmt - is the quota master target handling this request
+ * \param glb_fid - is the fid of the global index file
+ * \param slv_fid - is the fid of the newly created slave index file
+ * \param slv_ver - is the current version of the slave index file
+ * \param uuid - is the uuid of slave which is (re)connecting to the master
+ * target
+ *
+ * \retval - 0 on success, appropriate error on failure
+ */
+int qmt_pool_new_conn(const struct lu_env *env, struct qmt_device *qmt,
+ struct lu_fid *glb_fid, struct lu_fid *slv_fid,
+ __u64 *slv_ver, struct obd_uuid *uuid)
+{
+ struct qmt_pool_info *pool;
+ struct dt_object *slv_obj;
+ int pool_id, pool_type, qtype;
+ bool created = false;
+ int rc = 0;
+
+ /* extract pool info from global index FID */
+ rc = lquota_extract_fid(glb_fid, &pool_id, &pool_type, &qtype);
+ if (rc)
+ RETURN(rc);
+
+ /* look-up pool in charge of this global index FID */
+ pool = qmt_pool_lookup(env, qmt, pool_id, pool_type);
+ if (IS_ERR(pool))
+ RETURN(PTR_ERR(pool));
+
+ /* look-up slave index file */
+ slv_obj = lquota_disk_slv_find(env, qmt->qmt_child, pool->qpi_root,
+ glb_fid, uuid);
+ if (IS_ERR(slv_obj) && PTR_ERR(slv_obj) == -ENOENT) {
+ /* create slave index file */
+ slv_obj = lquota_disk_slv_find_create(env, qmt->qmt_child,
+ pool->qpi_root, glb_fid,
+ uuid, false);
+ created = true;
+ }
+ if (IS_ERR(slv_obj)) {
+ rc = PTR_ERR(slv_obj);
+ CERROR("%s: failed to create quota slave index file for %s (%d)"
+ "\n", qmt->qmt_svname, obd_uuid2str(uuid), rc);
+ GOTO(out, rc);
+ }
+
+ /* retrieve slave fid & current object version */
+ memcpy(slv_fid, lu_object_fid(&slv_obj->do_lu), sizeof(*slv_fid));
+ *slv_ver = dt_version_get(env, slv_obj);
+ lu_object_put(env, &slv_obj->do_lu);
+ if (created)
+ pool->qpi_slv_nr[qtype]++;
+out:
+ qpi_putref(env, pool);
+ RETURN(rc);
+}
+
+/*
+ * Look-up a lquota_entry in the pool hash and allocate it if not found.
+ *
+ * \param env - is the environment passed by the caller
+ * \param qmt - is the quota master target for which we have to initializa the
+ * pool configuration
+ * \param pool_id - is the 16-bit identifier of the pool
+ * \param pool_type - is the pool type, either LQUOTA_RES_MD or LQUOTA_RES_DT.
+ * \param qtype - is the quota type, either user or group.
+ * \param qid - is the quota ID to look-up
+ *
+ * \retval - valid pointer to lquota entry on success, appropriate error on
+ * failure
+ */
+struct lquota_entry *qmt_pool_lqe_lookup(const struct lu_env *env,
+ struct qmt_device *qmt,
+ int pool_id, int pool_type,
+ int qtype, union lquota_id *qid)
+{
+ struct qmt_pool_info *pool;
+ struct lquota_entry *lqe;
+ ENTRY;
+
+ /* look-up pool responsible for this global index FID */
+ pool = qmt_pool_lookup(env, qmt, pool_id, pool_type);
+ if (IS_ERR(pool))
+ RETURN((void *)pool);
+
+ /* now that we have the pool, let's look-up the quota entry in the
+ * right quota site */
+ lqe = lqe_locate(env, pool->qpi_site[qtype], qid);
+
+ qpi_putref(env, pool);
+ RETURN(lqe);
+}