From: Johann Lombardi Date: Tue, 2 Oct 2012 16:11:21 +0000 (+0200) Subject: LU-1842 quota: add quotactl support on qmt X-Git-Tag: 2.3.53~31 X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=commitdiff_plain;h=294aa9cb666c48e02da1057c222fe5f206ce38fc LU-1842 quota: add quotactl support on qmt This patch implements quotactl ({get,set}{quota,info}) support on the QMT. This involves defining the lqe operations for the qmt as well as pool operations in order to support per-pool quota in the future. Signed-off-by: Johann Lombardi Change-Id: I122e7cb85eb6d1d3460e86217e2a6723763c8cbf Reviewed-on: http://review.whamcloud.com/4160 Tested-by: Hudson Tested-by: Maloo Reviewed-by: Niu Yawei Reviewed-by: Mike Pershin Reviewed-by: Oleg Drokin --- diff --git a/lustre/ldlm/ldlm_resource.c b/lustre/ldlm/ldlm_resource.c index c484953..a55e947 100644 --- a/lustre/ldlm/ldlm_resource.c +++ b/lustre/ldlm/ldlm_resource.c @@ -936,6 +936,7 @@ void ldlm_namespace_get(struct ldlm_namespace *ns) { cfs_atomic_inc(&ns->ns_bref); } +EXPORT_SYMBOL(ldlm_namespace_get); void ldlm_namespace_put(struct ldlm_namespace *ns) { @@ -944,6 +945,7 @@ void ldlm_namespace_put(struct ldlm_namespace *ns) cfs_spin_unlock(&ns->ns_lock); } } +EXPORT_SYMBOL(ldlm_namespace_put); /* Register @ns in the list of namespaces */ void ldlm_namespace_register(struct ldlm_namespace *ns, ldlm_side_t client) diff --git a/lustre/quota/Makefile.in b/lustre/quota/Makefile.in index 193f727..7b61887 100644 --- a/lustre/quota/Makefile.in +++ b/lustre/quota/Makefile.in @@ -1,6 +1,6 @@ MODULES := lquota -qmt-objs := qmt_dev.o qmt_handler.o qmt_lock.o +qmt-objs := qmt_dev.o qmt_handler.o qmt_lock.o qmt_entry.o qmt_pool.o quota-objs := lproc_quota.o lquota_lib.o lquota_disk.o lquota_entry.o diff --git a/lustre/quota/lproc_quota.c b/lustre/quota/lproc_quota.c index 581cf63..9c65526 100644 --- a/lustre/quota/lproc_quota.c +++ b/lustre/quota/lproc_quota.c @@ -14,24 +14,18 @@ * in the LICENSE file that accompanied this code). * * You should have received a copy of the GNU General Public License - * version 2 along with this program; If not, see - * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf - * - * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, - * CA 95054 USA or visit www.sun.com if you need additional information or - * have any questions. + * version 2 along with this program; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 021110-1307, USA * * GPL HEADER END */ /* - * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2012 Intel, Inc. * Use is subject to license terms. * - * Copyright (c) 2011, Whamcloud, Inc. - */ -/* - * This file is part of Lustre, http://www.lustre.org/ - * Lustre is a trademark of Sun Microsystems, Inc. + * Author: Johann Lombardi + * Author: Niu Yawei */ #define DEBUG_SUBSYSTEM S_LQUOTA @@ -167,20 +161,22 @@ static void *lprocfs_quota_seq_next(struct seq_file *p, void *v, loff_t *pos) /* * Output example: * - * user_accounting: + * usr_accounting: * - id: 0 - * usage: { inodes: 209, bytes: 26161152 } + * usage: { inodes: 209, kbytes: 2616 } * - id: 840000038 - * usage: { inodes: 1, bytes: 10485760 } + * usage: { inodes: 1, kbytes: 1048 } */ static int lprocfs_quota_seq_show(struct seq_file *p, void *v) { - struct lquota_procfs *lqp = p->private; - const struct dt_it_ops *iops; - struct dt_it *it; - struct dt_key *key; - struct lquota_acct_rec rec; - int rc; + struct lquota_procfs *lqp = p->private; + struct lquota_thread_info *qti = lquota_info(&lqp->lqp_env); + const struct dt_it_ops *iops; + struct dt_it *it; + struct dt_key *key; + struct dt_rec *rec = (struct dt_rec *)&qti->qti_rec; + const struct lu_fid *fid; + int rc; LASSERT(lqp); if (lqp->lqp_obj == NULL) { @@ -188,14 +184,26 @@ static int lprocfs_quota_seq_show(struct seq_file *p, void *v) return 0; } - if (v == SEQ_START_TOKEN) { - const struct lu_fid *fid = lu_object_fid(&lqp->lqp_obj->do_lu); + fid = lu_object_fid(&lqp->lqp_obj->do_lu); - LASSERT(fid_is_acct(fid)); - if (fid_oid(fid) == ACCT_USER_OID) - seq_printf(p, "user_accounting:\n"); - else - seq_printf(p, "group_accounting:\n"); + if (v == SEQ_START_TOKEN) { + if (fid_is_acct(fid)) { + if (fid_oid(fid) == ACCT_USER_OID) + seq_printf(p, "usr_accounting:\n"); + else + seq_printf(p, "grp_accounting:\n"); + } else if (fid_seq(fid) == FID_SEQ_QUOTA_GLB) { + int poolid, rtype, qtype; + + rc = lquota_extract_fid(fid, &poolid, &rtype, &qtype); + if (rc) + return rc; + + seq_printf(p, "global_pool%d_%s_%s\n", poolid, + RES_NAME(rtype), QTYPE_NAME(qtype)); + } else { + return -ENOTSUPP; + } return 0; } @@ -210,7 +218,7 @@ static int lprocfs_quota_seq_show(struct seq_file *p, void *v) return PTR_ERR(key); } - rc = iops->rec(&lqp->lqp_env, it, (struct dt_rec *)&rec, 0); + rc = iops->rec(&lqp->lqp_env, it, rec, 0); if (rc) { CERROR("%s: failed to get rec: rc = %d\n", lqp->lqp_obj->do_lu.lo_dev->ld_obd->obd_name, rc); @@ -218,8 +226,19 @@ static int lprocfs_quota_seq_show(struct seq_file *p, void *v) } seq_printf(p, "- %-8s %llu\n", "id:", *((__u64 *)key)); - seq_printf(p, " %-8s { inodes: %20"LPF64"u, bytes: %20"LPF64"u }\n", - "usage:", rec.ispace, rec.bspace); + if (fid_is_acct(fid)) + seq_printf(p, " %-8s { inodes: %20"LPF64"u, kbytes: %20"LPF64 + "u }\n", "usage:", + ((struct lquota_acct_rec *)rec)->ispace, + toqb(((struct lquota_acct_rec *)rec)->bspace)); + else if (fid_seq(fid) == FID_SEQ_QUOTA_GLB) + seq_printf(p, " %-8s { hard: %20"LPF64"u, soft: %20"LPF64 + "u, granted: %20"LPF64"u, time: %20"LPF64"u }\n", + "limits:", + ((struct lquota_glb_rec *)rec)->qbr_hardlimit, + ((struct lquota_glb_rec *)rec)->qbr_softlimit, + ((struct lquota_glb_rec *)rec)->qbr_granted, + ((struct lquota_glb_rec *)rec)->qbr_time); return 0; } diff --git a/lustre/quota/lquota_disk.c b/lustre/quota/lquota_disk.c index 6421bc4..777b2ab 100644 --- a/lustre/quota/lquota_disk.c +++ b/lustre/quota/lquota_disk.c @@ -92,18 +92,6 @@ lquota_disk_find_create(const struct lu_env *env, struct dt_device *dev, obj = dt_locate(env, dev, &qti->qti_fid); if (IS_ERR(obj)) GOTO(out, obj); - - /* install index operation vector */ - if (obj->do_index_ops == NULL) { - rc = obj->do_ops->do_index_try(env, obj, idx_feat); - if (rc) { - CERROR("%s: fail to setup index operations for "DFID - " rc:%d\n", dev->dd_lu_dev.ld_obd->obd_name, - PFID(lu_object_fid(&obj->do_lu)), rc); - lu_object_put(env, &obj->do_lu); - obj = ERR_PTR(rc); - } - } out: local_oid_storage_fini(env, los); RETURN(obj); @@ -291,10 +279,26 @@ struct dt_object *lquota_disk_glb_find_create(const struct lu_env *env, idx_feat); } - if (IS_ERR(glb_idx)) + if (IS_ERR(glb_idx)) { CERROR("%s: failed to look-up/create idx file "DFID" rc:%ld " "local:%d\n", dev->dd_lu_dev.ld_obd->obd_name, PFID(fid), PTR_ERR(glb_idx), local); + RETURN(glb_idx); + } + + /* install index operation vector */ + if (glb_idx->do_index_ops == NULL) { + int rc; + + rc = glb_idx->do_ops->do_index_try(env, glb_idx, idx_feat); + if (rc) { + CERROR("%s: failed to setup index operations for "DFID + " rc:%d\n", dev->dd_lu_dev.ld_obd->obd_name, + PFID(lu_object_fid(&glb_idx->do_lu)), rc); + lu_object_put(env, &glb_idx->do_lu); + glb_idx = ERR_PTR(rc); + } + } RETURN(glb_idx); } @@ -435,6 +439,22 @@ struct dt_object *lquota_disk_slv_find_create(const struct lu_env *env, qti->qti_buf); } + if (IS_ERR(slv_idx)) + RETURN(slv_idx); + + /* install index operation vector */ + if (slv_idx->do_index_ops == NULL) { + rc = slv_idx->do_ops->do_index_try(env, slv_idx, + &dt_quota_slv_features); + if (rc) { + CERROR("%s: failed to setup index operations for "DFID + " rc:%d\n", dev->dd_lu_dev.ld_obd->obd_name, + PFID(lu_object_fid(&slv_idx->do_lu)), rc); + lu_object_put(env, &slv_idx->do_lu); + slv_idx = ERR_PTR(rc); + } + } + RETURN(slv_idx); } diff --git a/lustre/quota/lquota_entry.c b/lustre/quota/lquota_entry.c index 830e9cf..787ca84 100644 --- a/lustre/quota/lquota_entry.c +++ b/lustre/quota/lquota_entry.c @@ -41,7 +41,6 @@ static int hash_lqs_cur_bits = HASH_LQE_CUR_BITS; CFS_MODULE_PARM(hash_lqs_cur_bits, "i", int, 0444, "the current bits of lqe hash"); -cfs_mem_cache_t *lqe_kmem; static unsigned lqe64_hash_hash(cfs_hash_t *hs, const void *key, unsigned mask) { diff --git a/lustre/quota/lquota_internal.h b/lustre/quota/lquota_internal.h index b75e5ce..7fedd29 100644 --- a/lustre/quota/lquota_internal.h +++ b/lustre/quota/lquota_internal.h @@ -32,6 +32,7 @@ #define _LQUOTA_INTERNAL_H #define QTYPE_NAME(qtype) ((qtype) == USRQUOTA ? "usr" : "grp") +#define RES_NAME(res) ((res) == LQUOTA_RES_MD ? "md" : "dt") #define QIF_IFLAGS (QIF_INODES | QIF_ITIME | QIF_ILIMITS) #define QIF_BFLAGS (QIF_SPACE | QIF_BTIME | QIF_BLIMITS) @@ -254,6 +255,14 @@ static inline void lqe_read_unlock(struct lquota_entry *lqe) cfs_read_unlock(&lqe->lqe_lock); } +/* + * Helper functions & prototypes + */ + +/* minimum qunit size, 1K inode for metadata pool and 1MB for data pool */ +#define LQUOTA_LEAST_QUNIT(type) \ + (type == LQUOTA_RES_MD ? (1 << 10) : toqb(PTLRPC_MAX_BRW_SIZE)) + /* Common data shared by quota-level handlers. This is allocated per-thread to * reduce stack consumption */ struct lquota_thread_info { @@ -335,8 +344,9 @@ void lquota_lqe_debug0(struct lquota_entry *lqe, struct dt_object *acct_obj_lookup(const struct lu_env *, struct dt_device *, int); void lquota_generate_fid(struct lu_fid *, int, int, int); -int lquota_extract_fid(struct lu_fid *, int *, int *, int *); +int lquota_extract_fid(const struct lu_fid *, int *, int *, int *); const struct dt_index_features *glb_idx_feature(struct lu_fid *); +extern cfs_mem_cache_t *lqe_kmem; /* lquota_entry.c */ /* site create/destroy */ diff --git a/lustre/quota/lquota_lib.c b/lustre/quota/lquota_lib.c index 67145e3..bc0716e 100644 --- a/lustre/quota/lquota_lib.c +++ b/lustre/quota/lquota_lib.c @@ -40,6 +40,19 @@ #include "lquota_internal.h" +cfs_mem_cache_t *lqe_kmem; + +struct lu_kmem_descr lquota_caches[] = { + { + .ckd_cache = &lqe_kmem, + .ckd_name = "lqe_kmem", + .ckd_size = sizeof(struct lquota_entry) + }, + { + .ckd_cache = NULL + } +}; + /* register lquota key */ LU_KEY_INIT_FINI(lquota, struct lquota_thread_info); LU_CONTEXT_KEY_DEFINE(lquota, LCT_MD_THREAD | LCT_DT_THREAD | LCT_LOCAL); @@ -244,7 +257,7 @@ void lquota_generate_fid(struct lu_fid *fid, int pool_id, int pool_type, * Helper routine used to extract pool ID, pool type and quota type from a * given FID. */ -int lquota_extract_fid(struct lu_fid *fid, int *pool_id, int *pool_type, +int lquota_extract_fid(const struct lu_fid *fid, int *pool_id, int *pool_type, int *quota_type) { unsigned int tmp; @@ -338,10 +351,14 @@ static int __init init_lquota(void) #warning "remove old quota compatibility code" #endif - rc = qmt_glb_init(); + rc = lu_kmem_init(lquota_caches); if (rc) GOTO(out_key, rc); + rc = qmt_glb_init(); + if (rc) + GOTO(out_caches, rc); + rc = qsd_glb_init(); if (rc) GOTO(out_qmt, rc); @@ -350,6 +367,8 @@ static int __init init_lquota(void) out_qmt: qmt_glb_fini(); +out_caches: + lu_kmem_fini(lquota_caches); out_key: lu_context_key_degister(&lquota_thread_key); return rc; @@ -359,6 +378,7 @@ static void exit_lquota(void) { qsd_glb_fini(); qmt_glb_fini(); + lu_kmem_fini(lquota_caches); lu_context_key_degister(&lquota_thread_key); } diff --git a/lustre/quota/qmt_dev.c b/lustre/quota/qmt_dev.c index e165ae7..701cc16 100644 --- a/lustre/quota/qmt_dev.c +++ b/lustre/quota/qmt_dev.c @@ -99,12 +99,29 @@ static struct lu_device *qmt_device_fini(const struct lu_env *env, LASSERT(qmt != NULL); CDEBUG(D_QUOTA, "%s: initiating QMT shutdown\n", qmt->qmt_svname); + qmt->qmt_stopping = true; + + /* remove qmt proc entry */ + if (qmt->qmt_proc != NULL && !IS_ERR(qmt->qmt_proc)) { + lprocfs_remove(&qmt->qmt_proc); + qmt->qmt_proc = NULL; + } + + /* kill pool instances, if any */ + qmt_pool_fini(env, qmt); /* disconnect from OSD */ if (qmt->qmt_child_exp != NULL) { obd_disconnect(qmt->qmt_child_exp); qmt->qmt_child_exp = NULL; - qmt->qmt_child = NULL; + qmt->qmt_child = NULL; + } + + /* release reference on MDT namespace */ + if (ld->ld_obd->obd_namespace != NULL) { + ldlm_namespace_put(ld->ld_obd->obd_namespace); + ld->ld_obd->obd_namespace = NULL; + qmt->qmt_ns = NULL; } RETURN(NULL); @@ -159,7 +176,7 @@ static int qmt_connect_to_osd(const struct lu_env *env, struct qmt_device *qmt, /* initialize site (although it isn't used anywhere) and lu_device * pointer to next device */ qmt->qmt_child = lu2dt_dev(qmt->qmt_child_exp->exp_obd->obd_lu_dev); - ld->ld_site = qmt->qmt_child_exp->exp_obd->obd_lu_dev->ld_site; + ld->ld_site = qmt->qmt_child_exp->exp_obd->obd_lu_dev->ld_site; EXIT; out: if (data) @@ -187,7 +204,8 @@ static int qmt_device_init0(const struct lu_env *env, struct qmt_device *qmt, struct lu_device_type *ldt, struct lustre_cfg *cfg) { struct lu_device *ld = qmt2lu_dev(qmt); - struct obd_device *obd; + struct obd_device *obd, *mdt_obd; + struct obd_type *type; int rc; ENTRY; @@ -204,11 +222,42 @@ static int qmt_device_init0(const struct lu_env *env, struct qmt_device *qmt, obd->obd_lu_dev = ld; ld->ld_obd = obd; + /* look-up the parent MDT to steal its ldlm namespace ... */ + mdt_obd = class_name2obd(lustre_cfg_string(cfg, 2)); + if (mdt_obd == NULL) + RETURN(-ENOENT); + + /* grab reference on MDT namespace. kind of a hack until we have our + * own namespace & service threads */ + LASSERT(mdt_obd->obd_namespace != NULL); + ldlm_namespace_get(mdt_obd->obd_namespace); + obd->obd_namespace = mdt_obd->obd_namespace; + qmt->qmt_ns = obd->obd_namespace; + /* connect to backend osd device */ rc = qmt_connect_to_osd(env, qmt, cfg); if (rc) GOTO(out, rc); + /* at the moment there is no linkage between lu_type and obd_type, so + * we lookup obd_type this way */ + type = class_search_type(LUSTRE_QMT_NAME); + LASSERT(type != NULL); + + /* register proc directory associated with this qmt */ + qmt->qmt_proc = lprocfs_register(qmt->qmt_svname, type->typ_procroot, + NULL, NULL); + if (IS_ERR(qmt->qmt_proc)) { + rc = PTR_ERR(qmt->qmt_proc); + CERROR("%s: failed to create qmt proc entry (%d)\n", + qmt->qmt_svname, rc); + GOTO(out, rc); + } + + /* initialize pool configuration */ + rc = qmt_pool_init(env, qmt); + if (rc) + GOTO(out, rc); EXIT; out: if (rc) @@ -372,7 +421,27 @@ static int qmt_device_prepare(const struct lu_env *env, struct lu_device *parent, struct lu_device *ld) { - return 0; + struct qmt_device *qmt = lu2qmt_dev(ld); + struct dt_object *qmt_root; + int rc; + ENTRY; + + /* initialize quota master root directory where all index files will be + * stored */ + qmt_root = lquota_disk_dir_find_create(env, qmt->qmt_child, NULL, + QMT_DIR); + if (IS_ERR(qmt_root)) { + rc = PTR_ERR(qmt_root); + CERROR("%s: failed to create master quota directory (%d)\n", + qmt->qmt_svname, rc); + RETURN(rc); + } + + /* initialize on-disk indexes associated with each pool */ + rc = qmt_pool_prepare(env, qmt, qmt_root); + + lu_object_put(env, &qmt_root->do_lu); + RETURN(rc); } /* diff --git a/lustre/quota/qmt_entry.c b/lustre/quota/qmt_entry.c new file mode 100644 index 0000000..ee57900 --- /dev/null +++ b/lustre/quota/qmt_entry.c @@ -0,0 +1,414 @@ +/* + * GPL HEADER START + * + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 only, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License version 2 for more details (a copy is included + * in the LICENSE file that accompanied this code). + * + * You should have received a copy of the GNU General Public License + * version 2 along with this program; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 021110-1307, USA + * + * GPL HEADER END + */ +/* + * Copyright (c) 2012 Intel, Inc. + * Use is subject to license terms. + * + * Author: Johann Lombardi + * Author: Niu Yawei + */ + +#ifndef EXPORT_SYMTAB +# define EXPORT_SYMTAB +#endif + +#define DEBUG_SUBSYSTEM S_LQUOTA + +#include "qmt_internal.h" + +/* + * Initialize qmt-specific fields of quota entry. + * + * \param lqe - is the quota entry to initialize + * \param arg - is the pointer to the qmt_pool_info structure + */ +static void qmt_lqe_init(struct lquota_entry *lqe, void *arg) +{ + LASSERT(lqe_is_master(lqe)); + + lqe->lqe_revoke_time = 0; + cfs_init_rwsem(&lqe->lqe_sem); +} + +/* + * Update a lquota entry. This is done by reading quota settings from the global + * index. The lquota entry must be write locked. + * + * \param env - the environment passed by the caller + * \param lqe - is the quota entry to refresh + * \param arg - is the pointer to the qmt_pool_info structure + */ +static int qmt_lqe_read(const struct lu_env *env, struct lquota_entry *lqe, + void *arg) +{ + struct qmt_thread_info *qti = qmt_info(env); + struct qmt_pool_info *pool = (struct qmt_pool_info *)arg; + int rc; + ENTRY; + + LASSERT(lqe_is_master(lqe)); + + /* read record from disk */ + rc = lquota_disk_read(env, pool->qpi_glb_obj[lqe->lqe_site->lqs_qtype], + &lqe->lqe_id, (struct dt_rec *)&qti->qti_glb_rec); + + switch (rc) { + case -ENOENT: + /* no such entry, assume quota isn't enforced for this user */ + lqe->lqe_enforced = false; + break; + case 0: + /* copy quota settings from on-disk record */ + lqe->lqe_granted = qti->qti_glb_rec.qbr_granted; + lqe->lqe_hardlimit = qti->qti_glb_rec.qbr_hardlimit; + lqe->lqe_softlimit = qti->qti_glb_rec.qbr_softlimit; + lqe->lqe_gracetime = qti->qti_glb_rec.qbr_time; + + if (lqe->lqe_hardlimit == 0 && lqe->lqe_softlimit == 0) + /* {hard,soft}limit=0 means no quota enforced */ + lqe->lqe_enforced = false; + else + lqe->lqe_enforced = true; + + break; + default: + LQUOTA_ERROR(lqe, "failed to read quota entry from disk, rc:%d", + rc); + RETURN(rc); + } + + LQUOTA_DEBUG(lqe, "read"); + RETURN(0); +} + +/* + * Print lqe information for debugging. + * + * \param lqe - is the quota entry to debug + * \param arg - is the pointer to the qmt_pool_info structure + * \param msgdata - debug message + * \param fmt - format of debug message + */ +static void qmt_lqe_debug(struct lquota_entry *lqe, void *arg, + struct libcfs_debug_msg_data *msgdata, + const char *fmt, va_list args) +{ + struct qmt_pool_info *pool = (struct qmt_pool_info *)arg; + + libcfs_debug_vmsg2(msgdata, fmt, args, + "qmt:%s pool:%d-%s id:"LPU64" enforced:%d hard:"LPU64 + " soft:"LPU64" granted:"LPU64" time:"LPU64" qunit:" + LPU64" edquot:%d revoke:"LPU64"\n", + pool->qpi_qmt->qmt_svname, + pool->qpi_key & 0x0000ffff, + RES_NAME(pool->qpi_key >> 16), + lqe->lqe_id.qid_uid, lqe->lqe_enforced, + lqe->lqe_hardlimit, lqe->lqe_softlimit, + lqe->lqe_granted, lqe->lqe_gracetime, + lqe->lqe_qunit, lqe->lqe_edquot, + lqe->lqe_revoke_time); +} + +/* + * Vector of quota entry operations supported on the master + */ +struct lquota_entry_operations qmt_lqe_ops = { + .lqe_init = qmt_lqe_init, + .lqe_read = qmt_lqe_read, + .lqe_debug = qmt_lqe_debug, +}; + +/* + * Reserve enough credits to update records in both the global index and + * the slave index identified by \slv_obj + * + * \param env - is the environment passed by the caller + * \param lqe - is the quota entry associated with the identifier + * subject to the change + * \param slv_obj - is the dt_object associated with the index file + * \param restore - is a temporary storage for current quota settings which will + * be restored if something goes wrong at index update time. + */ +struct thandle *qmt_trans_start_with_slv(const struct lu_env *env, + struct lquota_entry *lqe, + struct dt_object *slv_obj, + struct qmt_lqe_restore *restore) +{ + struct qmt_device *qmt; + struct thandle *th; + int rc; + ENTRY; + + LASSERT(lqe != NULL); + LASSERT(lqe_is_master(lqe)); + + qmt = lqe2qpi(lqe)->qpi_qmt; + + if (slv_obj != NULL) + LQUOTA_DEBUG(lqe, "declare write for slv "DFID, + PFID(lu_object_fid(&slv_obj->do_lu))); + + /* start transaction */ + th = dt_trans_create(env, qmt->qmt_child); + if (IS_ERR(th)) + RETURN(th); + + if (slv_obj == NULL) + /* quota settings on master are updated synchronously for the + * time being */ + th->th_sync = 1; + + /* reserve credits for global index update */ + rc = lquota_disk_declare_write(env, th, LQE_GLB_OBJ(lqe), &lqe->lqe_id); + if (rc) + GOTO(out, rc); + + if (slv_obj != NULL) { + /* reserve credits for slave index update */ + rc = lquota_disk_declare_write(env, th, slv_obj, &lqe->lqe_id); + if (rc) + GOTO(out, rc); + } + + /* start transaction */ + rc = dt_trans_start_local(env, qmt->qmt_child, th); + if (rc) + GOTO(out, rc); + + EXIT; +out: + if (rc) { + dt_trans_stop(env, qmt->qmt_child, th); + th = ERR_PTR(rc); + LQUOTA_ERROR(lqe, "failed to slv declare write for "DFID + ", rc:%d", PFID(lu_object_fid(&slv_obj->do_lu)), + rc); + } else { + restore->qlr_hardlimit = lqe->lqe_hardlimit; + restore->qlr_softlimit = lqe->lqe_softlimit; + restore->qlr_gracetime = lqe->lqe_gracetime; + restore->qlr_granted = lqe->lqe_granted; + restore->qlr_qunit = lqe->lqe_qunit; + } + return th; +} + +/* + * Reserve enough credits to update a record in the global index + * + * \param env - is the environment passed by the caller + * \param lqe - is the quota entry to be modified in the global index + * \param restore - is a temporary storage for current quota settings which will + * be restored if something goes wrong at index update time. + */ +struct thandle *qmt_trans_start(const struct lu_env *env, + struct lquota_entry *lqe, + struct qmt_lqe_restore *restore) +{ + LQUOTA_DEBUG(lqe, "declare write"); + return qmt_trans_start_with_slv(env, lqe, NULL, restore); +} + +/* + * Update record associated with a quota entry in the global index. + * If LQUOTA_BUMP_VER is set, then the global index version must also be + * bumped. + * The entry must be at least read locked, dirty and up-to-date. + * + * \param env - the environment passed by the caller + * \param th - is the transaction handle to be used for the disk writes + * \param lqe - is the quota entry to udpate + * \param obj - is the dt_object associated with the index file + * \param flags - can be LQUOTA_BUMP_VER or LQUOTA_SET_VER. + * \param ver - is used to return the new version of the index. + * + * \retval - 0 on success and lqe dirty flag cleared, + * appropriate error on failure and uptodate flag cleared. + */ +int qmt_glb_write(const struct lu_env *env, struct thandle *th, + struct lquota_entry *lqe, __u32 flags, __u64 *ver) +{ + struct qmt_thread_info *qti = qmt_info(env); + struct lquota_glb_rec *rec; + int rc; + ENTRY; + + LASSERT(lqe != NULL); + LASSERT(lqe_is_master(lqe)); + LASSERT(lqe_is_locked(lqe)); + LASSERT(lqe->lqe_uptodate); + LASSERT((flags & ~(LQUOTA_BUMP_VER | LQUOTA_SET_VER)) == 0); + + LQUOTA_DEBUG(lqe, "write glb"); + + if (!lqe->lqe_enforced && lqe->lqe_granted == 0 && + lqe->lqe_id.qid_uid != 0) { + /* quota isn't enforced any more for this entry and there is no + * more space granted to slaves, let's just remove the entry + * from the index */ + rec = NULL; + } else { + rec = &qti->qti_glb_rec; + + /* fill global index with updated quota settings */ + rec->qbr_granted = lqe->lqe_granted; + rec->qbr_hardlimit = lqe->lqe_hardlimit; + rec->qbr_softlimit = lqe->lqe_softlimit; + rec->qbr_time = lqe->lqe_gracetime; + } + + /* write new quota settings */ + rc = lquota_disk_write(env, th, LQE_GLB_OBJ(lqe), &lqe->lqe_id, + (struct dt_rec *)rec, flags, ver); + if (rc) + /* we failed to write the new quota settings to disk, report + * error to caller who will restore the initial value */ + LQUOTA_ERROR(lqe, "failed to update global index, rc:%d", rc); + + RETURN(rc); +} + +/* + * Read from disk how much quota space is allocated to a slave. + * This is done by reading records from the dedicated slave index file. + * Return in \granted how much quota space is currently allocated to the + * slave. + * The entry must be at least read locked. + * + * \param env - the environment passed by the caller + * \param lqe - is the quota entry associated with the identifier to look-up + * in the slave index + * \param slv_obj - is the dt_object associated with the slave index + * \param granted - is the output parameter where to return how much space + * is granted to the slave. + * + * \retval - 0 on success, appropriate error on failure + */ +int qmt_slv_read(const struct lu_env *env, struct lquota_entry *lqe, + struct dt_object *slv_obj, __u64 *granted) +{ + struct qmt_thread_info *qti = qmt_info(env); + struct lquota_slv_rec *slv_rec = &qti->qti_slv_rec; + int rc; + ENTRY; + + LASSERT(lqe != NULL); + LASSERT(lqe_is_master(lqe)); + LASSERT(lqe_is_locked(lqe)); + + LQUOTA_DEBUG(lqe, "read slv "DFID, + PFID(lu_object_fid(&slv_obj->do_lu))); + + /* read slave record from disk */ + rc = lquota_disk_read(env, slv_obj, &lqe->lqe_id, + (struct dt_rec *)slv_rec); + switch (rc) { + case -ENOENT: + *granted = 0; + break; + case 0: + /* extract granted from on-disk record */ + *granted = slv_rec->qsr_granted; + break; + default: + LQUOTA_ERROR(lqe, "failed to read slave record "DFID, + PFID(lu_object_fid(&slv_obj->do_lu))); + RETURN(rc); + } + + LQUOTA_DEBUG(lqe, "successful slv read "LPU64, *granted); + + RETURN(0); +} + +/* + * Update record in slave index file. + * The entry must be at least read locked. + * + * \param env - the environment passed by the caller + * \param th - is the transaction handle to be used for the disk writes + * \param lqe - is the dirty quota entry which will be updated at the same time + * as the slave index + * \param slv_obj - is the dt_object associated with the slave index + * \param flags - can be LQUOTA_BUMP_VER or LQUOTA_SET_VER. + * \param ver - is used to return the new version of the index. + * \param granted - is the new amount of quota space owned by the slave + * + * \retval - 0 on success, appropriate error on failure + */ +int qmt_slv_write(const struct lu_env *env, struct thandle *th, + struct lquota_entry *lqe, struct dt_object *slv_obj, + __u32 flags, __u64 *ver, __u64 granted) +{ + struct qmt_thread_info *qti = qmt_info(env); + struct lquota_slv_rec *rec; + int rc; + ENTRY; + + LASSERT(lqe != NULL); + LASSERT(lqe_is_master(lqe)); + LASSERT(lqe_is_locked(lqe)); + + LQUOTA_DEBUG(lqe, "write slv "DFID" granted:"LPU64, + PFID(lu_object_fid(&slv_obj->do_lu)), granted); + + if (granted == 0) { + /* this slave does not own any quota space for this ID any more, + * so let's just remove the entry from the index */ + rec = NULL; + } else { + rec = &qti->qti_slv_rec; + + /* updated space granted to this slave */ + rec->qsr_granted = granted; + } + + /* write new granted space */ + rc = lquota_disk_write(env, th, slv_obj, &lqe->lqe_id, + (struct dt_rec *)rec, flags, ver); + if (rc) { + LQUOTA_ERROR(lqe, "failed to update slave index "DFID" granted:" + LPU64, PFID(lu_object_fid(&slv_obj->do_lu)), + granted); + RETURN(rc); + } + + RETURN(0); +} + +/* + * Check whether new limits are valid for this pool + * + * \param lqe - is the quota entry subject to the setquota + * \param hard - is the new hard limit + * \param soft - is the new soft limit + */ +int qmt_validate_limits(struct lquota_entry *lqe, __u64 hard, __u64 soft) +{ + ENTRY; + + if (hard != 0 && soft > hard) + /* soft limit must be less than hard limit */ + RETURN(-EINVAL); + RETURN(0); +} diff --git a/lustre/quota/qmt_handler.c b/lustre/quota/qmt_handler.c index c077867..19e706f 100644 --- a/lustre/quota/qmt_handler.c +++ b/lustre/quota/qmt_handler.c @@ -38,6 +38,263 @@ #include "qmt_internal.h" /* + * Fetch grace time for either inode or block. + * + * \param env - is the environment passed by the caller + * \param qmt - is the quota master target + * \param pool_id - is the 16-bit pool identifier + * \param restype - is the pool type, either block (i.e. LQUOTA_RES_DT) or inode + * (i.e. LQUOTA_RES_MD) + * \param qtype - is the quota type + * \param time - is the output variable where to copy the grace time + */ +static int qmt_getinfo(const struct lu_env *env, struct qmt_device *qmt, + __u16 pool_id, __u8 restype, __u8 qtype, __u64 *time) +{ + struct qmt_thread_info *qti = qmt_info(env); + union lquota_id *id = &qti->qti_id_bis; + struct lquota_entry *lqe; + ENTRY; + + /* Global grace time is stored in quota settings of ID 0. */ + id->qid_uid = 0; + + /* look-up quota entry storing grace time */ + lqe = qmt_pool_lqe_lookup(env, qmt, pool_id, restype, qtype, id); + if (IS_ERR(lqe)) + RETURN(PTR_ERR(lqe)); + + lqe_read_lock(lqe); + LQUOTA_DEBUG(lqe, "getinfo"); + /* copy grace time */ + *time = lqe->lqe_gracetime; + lqe_read_unlock(lqe); + + lqe_putref(lqe); + RETURN(0); +} + +/* + * Update grace time for either inode or block. + * Global grace time is stored in quota settings of ID 0. + * + * \param env - is the environment passed by the caller + * \param qmt - is the quota master target + * \param pool_id - is the 16-bit pool identifier + * \param restype - is the pool type, either block (i.e. LQUOTA_RES_DT) or inode + * (i.e. LQUOTA_RES_MD) + * \param qtype - is the quota type + * \param time - is the new grace time + */ +static int qmt_setinfo(const struct lu_env *env, struct qmt_device *qmt, + __u16 pool_id, __u8 restype, __u8 qtype, __u64 time) +{ + struct qmt_thread_info *qti = qmt_info(env); + union lquota_id *id = &qti->qti_id_bis; + struct lquota_entry *lqe; + struct thandle *th = NULL; + int rc; + ENTRY; + + /* Global grace time is stored in quota settings of ID 0. */ + id->qid_uid = 0; + + /* look-up quota entry storing the global grace time */ + lqe = qmt_pool_lqe_lookup(env, qmt, pool_id, restype, qtype, id); + if (IS_ERR(lqe)) + RETURN(PTR_ERR(lqe)); + + /* allocate & start transaction with enough credits to update grace + * time in the global index file */ + th = qmt_trans_start(env, lqe, &qti->qti_restore); + if (IS_ERR(th)) + GOTO(out_nolock, rc = PTR_ERR(th)); + + /* write lock quota entry storing the grace time */ + lqe_write_lock(lqe); + if (lqe->lqe_gracetime == time) + /* grace time is the same */ + GOTO(out, rc = 0); + + LQUOTA_DEBUG(lqe, "setinfo time:"LPU64, time); + + /* set new grace time */ + lqe->lqe_gracetime = time; + /* always set enforced bit for ID 0 to make sure it does not go away */ + lqe->lqe_enforced = true; + + /* write new grace time to disk, no need for version bump */ + rc = qmt_glb_write(env, th, lqe, 0, NULL); + if (rc) { + /* restore initial grace time */ + qmt_restore(lqe, &qti->qti_restore); + GOTO(out, rc); + } + EXIT; +out: + lqe_write_unlock(lqe); +out_nolock: + lqe_putref(lqe); + if (th != NULL && !IS_ERR(th)) + dt_trans_stop(env, qmt->qmt_child, th); + return rc; +} + +/* + * Retrieve quota settings for a given identifier. + * + * \param env - is the environment passed by the caller + * \param qmt - is the quota master target + * \param pool_id - is the 16-bit pool identifier + * \param restype - is the pool type, either block (i.e. LQUOTA_RES_DT) or inode + * (i.e. LQUOTA_RES_MD) + * \param qtype - is the quota type + * \param id - is the quota indentifier for which we want to acces quota + * settings. + * \param hard - is the output variable where to copy the hard limit + * \param soft - is the output variable where to copy the soft limit + * \param time - is the output variable where to copy the grace time + */ +static int qmt_getquota(const struct lu_env *env, struct qmt_device *qmt, + __u16 pool_id, __u8 restype, __u8 qtype, + union lquota_id *id, __u64 *hard, __u64 *soft, + __u64 *time) +{ + struct lquota_entry *lqe; + ENTRY; + + /* look-up lqe structure containing quota settings */ + lqe = qmt_pool_lqe_lookup(env, qmt, pool_id, restype, qtype, id); + if (IS_ERR(lqe)) + RETURN(PTR_ERR(lqe)); + + /* copy quota settings */ + lqe_read_lock(lqe); + LQUOTA_DEBUG(lqe, "getquota"); + *hard = lqe->lqe_hardlimit; + *soft = lqe->lqe_softlimit; + *time = lqe->lqe_gracetime; + lqe_read_unlock(lqe); + + lqe_putref(lqe); + RETURN(0); +} + +/* + * Update quota settings for a given identifier. + * + * \param env - is the environment passed by the caller + * \param qmt - is the quota master target + * \param pool_id - is the 16-bit pool identifier + * \param restype - is the pool type, either block (i.e. LQUOTA_RES_DT) or inode + * (i.e. LQUOTA_RES_MD) + * \param qtype - is the quota type + * \param id - is the quota indentifier for which we want to modify quota + * settings. + * \param hard - is the new hard limit + * \param soft - is the new soft limit + * \param time - is the new grace time + * \param valid - is the list of settings to change + */ +static int qmt_setquota(const struct lu_env *env, struct qmt_device *qmt, + __u16 pool_id, __u8 restype, __u8 qtype, + union lquota_id *id, __u64 hard, __u64 soft, __u64 time, + __u32 valid) +{ + struct qmt_thread_info *qti = qmt_info(env); + struct lquota_entry *lqe; + struct thandle *th = NULL; + __u64 grace, ver; + bool dirtied = false, bump_version = false; + int rc = 0; + ENTRY; + + /* fetch global grace time */ + rc = qmt_getinfo(env, qmt, pool_id, restype, qtype, &grace); + if (rc) + RETURN(rc); + + /* look-up quota entry associated with this ID */ + lqe = qmt_pool_lqe_lookup(env, qmt, pool_id, restype, qtype, id); + if (IS_ERR(lqe)) + RETURN(PTR_ERR(lqe)); + + /* allocate & start transaction with enough credits to update quota + * settings in the global index file */ + th = qmt_trans_start(env, lqe, &qti->qti_restore); + if (IS_ERR(th)) + GOTO(out_nolock, rc = PTR_ERR(th)); + + lqe_write_lock(lqe); + LQUOTA_DEBUG(lqe, "setquota valid:%x hard:"LPU64" soft:"LPU64 + " time:"LPU64, valid, hard, soft, time); + + if ((valid & QIF_TIMES) != 0 && lqe->lqe_gracetime != time) { + /* change time settings */ + lqe->lqe_gracetime = time; + dirtied = true; + } + + if ((valid & QIF_LIMITS) != 0 && + (lqe->lqe_hardlimit != hard || lqe->lqe_softlimit != soft)) { + bool enforced = lqe->lqe_enforced; + + rc = qmt_validate_limits(lqe, hard, soft); + if (rc) + GOTO(out, rc); + + /* change quota limits */ + lqe->lqe_hardlimit = hard; + lqe->lqe_softlimit = soft; + + /* clear grace time */ + if (lqe->lqe_softlimit == 0 || + lqe->lqe_granted <= lqe->lqe_softlimit) + /* no soft limit or below soft limit, let's clear grace + * time */ + lqe->lqe_gracetime = 0; + else if ((valid & QIF_TIMES) == 0) + /* set grace only if user hasn't provided his own */ + lqe->lqe_gracetime = cfs_time_current_sec() + grace; + + /* change enforced status based on new parameters */ + if (lqe->lqe_hardlimit == 0 && lqe->lqe_softlimit == 0) + lqe->lqe_enforced = false; + else + lqe->lqe_enforced = true; + + if ((enforced && !lqe->lqe_enforced) || + (!enforced && lqe->lqe_enforced)) + /* if enforced status has changed, we need to inform + * slave, therefore we need to bump the version */ + bump_version = true; + + dirtied = true; + } + + if (dirtied) { + /* write new quota settings to disk */ + rc = qmt_glb_write(env, th, lqe, + bump_version ? LQUOTA_BUMP_VER : 0, &ver); + if (rc) { + /* restore initial quota settings */ + qmt_restore(lqe, &qti->qti_restore); + GOTO(out, rc); + } + } + EXIT; +out: + lqe_write_unlock(lqe); +out_nolock: + lqe_putref(lqe); + + if (th != NULL && !IS_ERR(th)) + dt_trans_stop(env, qmt->qmt_child, th); + + return rc; +} + +/* * Handle quotactl request. * * \param env - is the environment passed by the caller @@ -47,7 +304,10 @@ static int qmt_quotactl(const struct lu_env *env, struct lu_device *ld, struct obd_quotactl *oqctl) { + struct qmt_thread_info *qti = qmt_info(env); + union lquota_id *id = &qti->qti_id; struct qmt_device *qmt = lu2qmt_dev(ld); + struct obd_dqblk *dqb = &oqctl->qc_dqblk; int rc = 0; ENTRY; @@ -59,25 +319,109 @@ static int qmt_quotactl(const struct lu_env *env, struct lu_device *ld, switch (oqctl->qc_cmd) { - case Q_GETINFO: - case Q_SETINFO: - case Q_SETQUOTA: - /* XXX: not implemented yet. */ - CERROR("quotactl operation %d not implemented yet\n", - oqctl->qc_cmd); - RETURN(-EOPNOTSUPP); - - case Q_GETQUOTA: - /* XXX: return no limit for now, just for testing purpose */ - memset(&oqctl->qc_dqblk, 0, sizeof(struct obd_dqblk)); - oqctl->qc_dqblk.dqb_valid = QIF_LIMITS; - rc = 0; + case Q_GETINFO: /* read grace times */ + /* read inode grace time */ + rc = qmt_getinfo(env, qmt, 0, LQUOTA_RES_MD, oqctl->qc_type, + &oqctl->qc_dqinfo.dqi_igrace); + if (rc) + break; + + /* read block grace time */ + rc = qmt_getinfo(env, qmt, 0, LQUOTA_RES_DT, oqctl->qc_type, + &oqctl->qc_dqinfo.dqi_bgrace); + break; + + case Q_SETINFO: /* modify grace times */ + /* setinfo should be using dqi->dqi_valid, but lfs incorrectly + * sets the valid flags in dqb->dqb_valid instead, try to live + * with that ... */ + if ((dqb->dqb_valid & QIF_ITIME) != 0) { + /* set inode grace time */ + rc = qmt_setinfo(env, qmt, 0, LQUOTA_RES_MD, + oqctl->qc_type, + oqctl->qc_dqinfo.dqi_igrace); + if (rc) + break; + } + + if ((dqb->dqb_valid & QIF_BTIME) != 0) + /* set block grace time */ + rc = qmt_setinfo(env, qmt, 0, LQUOTA_RES_DT, + oqctl->qc_type, + oqctl->qc_dqinfo.dqi_bgrace); + break; + + case Q_GETQUOTA: /* consult quota limit */ + /* There is no quota limit for root user & group */ + if (oqctl->qc_id == 0) { + memset(dqb, 0, sizeof(*dqb)); + dqb->dqb_valid = QIF_LIMITS | QIF_TIMES; + break; + } + /* extract quota ID from quotactl request */ + id->qid_uid = oqctl->qc_id; + + /* look-up inode quota settings */ + rc = qmt_getquota(env, qmt, 0, LQUOTA_RES_MD, oqctl->qc_type, + id, &dqb->dqb_ihardlimit, + &dqb->dqb_isoftlimit, &dqb->dqb_itime); + if (rc) + break; + + dqb->dqb_valid |= QIF_ILIMITS | QIF_ITIME; + /* master isn't aware of actual inode usage */ + dqb->dqb_curinodes = 0; + + /* look-up block quota settings */ + rc = qmt_getquota(env, qmt, 0, LQUOTA_RES_DT, oqctl->qc_type, + id, &dqb->dqb_bhardlimit, + &dqb->dqb_bsoftlimit, &dqb->dqb_btime); + if (rc) + break; + + dqb->dqb_valid |= QIF_BLIMITS | QIF_BTIME; + /* master doesn't know the actual block usage */ + dqb->dqb_curspace = 0; + break; + + case Q_SETQUOTA: /* change quota limits */ + if (oqctl->qc_id == 0) + /* can't enforce a quota limit for root user & group */ + RETURN(-EPERM); + /* extract quota ID from quotactl request */ + id->qid_uid = oqctl->qc_id; + + if ((dqb->dqb_valid & QIF_IFLAGS) != 0) { + /* update inode quota settings */ + rc = qmt_setquota(env, qmt, 0, LQUOTA_RES_MD, + oqctl->qc_type, id, + dqb->dqb_ihardlimit, + dqb->dqb_isoftlimit, dqb->dqb_itime, + dqb->dqb_valid & QIF_IFLAGS); + if (rc) + break; + } + + if ((dqb->dqb_valid & QIF_BFLAGS) != 0) + /* update block quota settings */ + rc = qmt_setquota(env, qmt, 0, LQUOTA_RES_DT, + oqctl->qc_type, id, + dqb->dqb_bhardlimit, + dqb->dqb_bsoftlimit, dqb->dqb_btime, + dqb->dqb_valid & QIF_BFLAGS); break; + case Q_QUOTAON: + case Q_QUOTAOFF: /* quota is always turned on on the master */ + RETURN(0); + + case LUSTRE_Q_INVALIDATE: /* not supported any more */ + RETURN(-ENOTSUPP); + default: CERROR("%s: unsupported quotactl command: %d\n", qmt->qmt_svname, oqctl->qc_cmd); - RETURN(-EFAULT); + RETURN(-ENOTSUPP); } RETURN(rc); @@ -104,6 +448,8 @@ static int qmt_dqacq(const struct lu_env *env, struct lu_device *ld, if (repbody == NULL) RETURN(err_serious(-EFAULT)); + /* XXX: to be implemented */ + RETURN(0); } diff --git a/lustre/quota/qmt_internal.h b/lustre/quota/qmt_internal.h index abe782d..7802006 100644 --- a/lustre/quota/qmt_internal.h +++ b/lustre/quota/qmt_internal.h @@ -52,12 +52,125 @@ struct qmt_device { * index files */ struct obd_export *qmt_child_exp; struct dt_device *qmt_child; + + /* pointer to ldlm namespace to be used for quota locks */ + struct ldlm_namespace *qmt_ns; + + /* Hash table containing a qmt_pool_info structure for each pool + * this quota master is in charge of. We only have 2 pools in this + * hash for the time being: + * - one for quota management on the default metadata pool + * - one for quota managment on the default data pool + * + * Once we support quota on non-default pools, then more pools will + * be added to this hash table and pool master setup would have to be + * handled via configuration logs */ + cfs_hash_t *qmt_pool_hash; + + /* List of pools managed by this master target */ + cfs_list_t qmt_pool_list; + + /* procfs root directory for this qmt */ + cfs_proc_dir_entry_t *qmt_proc; + + unsigned long qmt_stopping:1; /* qmt is stopping */ + +}; + +/* + * Per-pool quota information. + * The qmt creates one such structure for each pool + * with quota enforced. All the structures are kept in a hash which is used to + * determine whether or not quota is enforced for a given pool. + * We currently only support the default data pool and default metadata pool + * with the pool_id 0. + */ +struct qmt_pool_info { + /* link to qmt's pool hash */ + cfs_hlist_node_t qpi_hash; + + /* chained list of all pools managed by the same qmt */ + cfs_list_t qpi_linkage; + + /* Pool key composed of pool_id | (pool_type << 16) + * Only pool ID 0 is supported for now and the pool type is either + * QUOTA_RES_MD or QUOTA_RES_DT. + * immutable after creation. */ + __u32 qpi_key; + + /* track users of this pool instance */ + cfs_atomic_t qpi_ref; + + /* back pointer to master target + * immutable after creation. */ + struct qmt_device *qpi_qmt; + + /* pointer to dt object associated with global indexes for both user + * and group quota */ + struct dt_object *qpi_glb_obj[MAXQUOTAS]; + + /* A pool supports two different quota types: user and group quota. + * Each quota type has its own global index and lquota_entry hash table. + */ + struct lquota_site *qpi_site[MAXQUOTAS]; + + /* number of slaves registered for each quota types */ + int qpi_slv_nr[MAXQUOTAS]; + + /* procfs root directory for this pool */ + cfs_proc_dir_entry_t *qpi_proc; + + /* pool directory where all indexes related to this pool instance are + * stored */ + struct dt_object *qpi_root; + + /* Global quota parameters which apply to all quota type */ + /* the least value of qunit */ + unsigned long qpi_least_qunit; +}; + +/* + * Helper routines and prototypes + */ + +/* helper routine to find qmt_pool_info associated a lquota_entry */ +static inline struct qmt_pool_info *lqe2qpi(struct lquota_entry *lqe) +{ + LASSERT(lqe_is_master(lqe)); + return (struct qmt_pool_info *)lqe->lqe_site->lqs_parent; +} + +/* return true if someone holds either a read or write lock on the lqe */ +static inline bool lqe_is_locked(struct lquota_entry *lqe) +{ + LASSERT(lqe_is_master(lqe)); + if (cfs_down_write_trylock(&lqe->lqe_sem) == 0) + return true; + lqe_write_unlock(lqe); + return false; +} + +/* value to be restored if someone wrong happens during lqe writeback */ +struct qmt_lqe_restore { + __u64 qlr_hardlimit; + __u64 qlr_softlimit; + __u64 qlr_gracetime; + __u64 qlr_granted; + __u64 qlr_qunit; }; /* Common data shared by qmt handlers */ struct qmt_thread_info { union lquota_rec qti_rec; union lquota_id qti_id; + union lquota_id qti_id_bis; + char qti_buf[MTI_NAME_MAXLEN]; + struct lu_fid qti_fid; + struct ldlm_res_id qti_resid; + union ldlm_gl_desc qti_gl_desc; + struct quota_body qti_body; + struct quota_body qti_repbody; + struct qmt_lqe_restore qti_restore; }; extern struct lu_context_key qmt_thread_key; @@ -89,6 +202,47 @@ static inline struct lu_device *qmt2lu_dev(struct qmt_device *qmt) return &qmt->qmt_dt_dev.dd_lu_dev; } +#define LQE_ROOT(lqe) (lqe2qpi(lqe)->qpi_root) +#define LQE_GLB_OBJ(lqe) (lqe2qpi(lqe)->qpi_glb_obj[lqe->lqe_site->lqs_qtype]) + +static inline void qmt_restore(struct lquota_entry *lqe, + struct qmt_lqe_restore *restore) +{ + lqe->lqe_hardlimit = restore->qlr_hardlimit; + lqe->lqe_softlimit = restore->qlr_softlimit; + lqe->lqe_gracetime = restore->qlr_gracetime; + lqe->lqe_granted = restore->qlr_granted; + lqe->lqe_qunit = restore->qlr_qunit; +} + +/* qmt_pool.c */ +void qmt_pool_fini(const struct lu_env *, struct qmt_device *); +int qmt_pool_init(const struct lu_env *, struct qmt_device *); +int qmt_pool_prepare(const struct lu_env *, struct qmt_device *, + struct dt_object *); +int qmt_pool_new_conn(const struct lu_env *, struct qmt_device *, + struct lu_fid *, struct lu_fid *, __u64 *, + struct obd_uuid *); +struct lquota_entry *qmt_pool_lqe_lookup(const struct lu_env *, + struct qmt_device *, int, int, int, + union lquota_id *); +/* qmt_entry.c */ +extern struct lquota_entry_operations qmt_lqe_ops; +struct thandle *qmt_trans_start_with_slv(const struct lu_env *, + struct lquota_entry *, + struct dt_object *, + struct qmt_lqe_restore *); +struct thandle *qmt_trans_start(const struct lu_env *, struct lquota_entry *, + struct qmt_lqe_restore *); +int qmt_glb_write(const struct lu_env *, struct thandle *, + struct lquota_entry *, __u32, __u64 *); +int qmt_slv_write(const struct lu_env *, struct thandle *, + struct lquota_entry *, struct dt_object *, __u32, __u64 *, + __u64); +int qmt_slv_read(const struct lu_env *, struct lquota_entry *, + struct dt_object *, __u64 *); +int qmt_validate_limits(struct lquota_entry *, __u64, __u64); + /* qmt_lock.c */ int qmt_intent_policy(const struct lu_env *, struct lu_device *, struct ptlrpc_request *, struct ldlm_lock **, int); diff --git a/lustre/quota/qmt_pool.c b/lustre/quota/qmt_pool.c new file mode 100644 index 0000000..3bea3d9 --- /dev/null +++ b/lustre/quota/qmt_pool.c @@ -0,0 +1,642 @@ +/* + * GPL HEADER START + * + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 only, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License version 2 for more details (a copy is included + * in the LICENSE file that accompanied this code). + * + * You should have received a copy of the GNU General Public License + * version 2 along with this program; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 021110-1307, USA + * + * GPL HEADER END + */ +/* + * Copyright (c) 2012 Intel, Inc. + * Use is subject to license terms. + * + * Author: Johann Lombardi + * Author: Niu Yawei + */ + +/* + * A Quota Master Target has a hash table where it stores qmt_pool_info + * structures. There is one such structure for each pool managed by the QMT. + * + * Each pool can have different quota types enforced (typically user & group + * quota). A pool is in charge of managing lquota_entry structures for each + * quota type. This is done by creating one lquota_entry site per quota + * type. A site stores entries in a hash table and read quota settings from disk + * when a given ID isn't present in the hash. + * + * The pool API exported here is the following: + * - qmt_pool_init(): initializes the general QMT structures used to manage + * pools. + * - qmt_pool_fini(): frees the structures allocated by qmt_pool_fini(). + * - qmt_pool_prepare(): sets up the on-disk indexes associated with each pool. + * - qmt_pool_new_conn(): is used to create a new slave index file. + * - qmt_pool_lqe_lookup(): returns an up-to-date lquota entry associated with + * a given ID. + */ + +#ifndef EXPORT_SYMTAB +# define EXPORT_SYMTAB +#endif + +#define DEBUG_SUBSYSTEM S_LQUOTA + +#include +#include +#include "qmt_internal.h" + +static void qmt_pool_free(const struct lu_env *, struct qmt_pool_info *); + +/* + * Static helper functions not used outside the scope of this file + */ + +/* + * Reference counter management for qmt_pool_info structures + */ +static inline void qpi_getref(struct qmt_pool_info *pool) +{ + cfs_atomic_inc(&pool->qpi_ref); +} + +static inline void qpi_putref(const struct lu_env *env, + struct qmt_pool_info *pool) +{ + LASSERT(atomic_read(&pool->qpi_ref) > 0); + if (cfs_atomic_dec_and_test(&pool->qpi_ref)) + qmt_pool_free(env, pool); +} + +static inline void qpi_putref_locked(struct qmt_pool_info *pool) +{ + LASSERT(cfs_atomic_read(&pool->qpi_ref) > 1); + cfs_atomic_dec(&pool->qpi_ref); +} + +/* + * Hash functions for qmt_pool_info management + */ + +static unsigned qpi_hash_hash(cfs_hash_t *hs, const void *key, unsigned mask) +{ + return cfs_hash_u32_hash(*((__u32 *)key), mask); +} + +static void *qpi_hash_key(cfs_hlist_node_t *hnode) +{ + struct qmt_pool_info *pool; + pool = cfs_hlist_entry(hnode, struct qmt_pool_info, qpi_hash); + return &pool->qpi_key; +} + +static int qpi_hash_keycmp(const void *key, cfs_hlist_node_t *hnode) +{ + struct qmt_pool_info *pool; + pool = cfs_hlist_entry(hnode, struct qmt_pool_info, qpi_hash); + return pool->qpi_key == *((__u32 *)key); +} + +static void *qpi_hash_object(cfs_hlist_node_t *hnode) +{ + return cfs_hlist_entry(hnode, struct qmt_pool_info, qpi_hash); +} + +static void qpi_hash_get(cfs_hash_t *hs, cfs_hlist_node_t *hnode) +{ + struct qmt_pool_info *pool; + pool = cfs_hlist_entry(hnode, struct qmt_pool_info, qpi_hash); + qpi_getref(pool); +} + +static void qpi_hash_put_locked(cfs_hash_t *hs, cfs_hlist_node_t *hnode) +{ + struct qmt_pool_info *pool; + pool = cfs_hlist_entry(hnode, struct qmt_pool_info, qpi_hash); + qpi_putref_locked(pool); +} + +static void qpi_hash_exit(cfs_hash_t *hs, cfs_hlist_node_t *hnode) +{ + CERROR("Should not have any item left!\n"); +} + +/* vector of hash operations */ +static cfs_hash_ops_t qpi_hash_ops = { + .hs_hash = qpi_hash_hash, + .hs_key = qpi_hash_key, + .hs_keycmp = qpi_hash_keycmp, + .hs_object = qpi_hash_object, + .hs_get = qpi_hash_get, + .hs_put_locked = qpi_hash_put_locked, + .hs_exit = qpi_hash_exit +}; + +/* some procfs helpers */ +static int lprocfs_qpi_rd_state(char *page, char **start, off_t off, + int count, int *eof, void *data) +{ + struct qmt_pool_info *pool = (struct qmt_pool_info *)data; + int type, i = 0; + + LASSERT(pool != NULL); + + i = snprintf(page, count, + "pool:\n" + " id: %u\n" + " type: %s\n" + " ref: %d\n" + " least qunit: %lu\n", + pool->qpi_key & 0x0000ffff, + RES_NAME(pool->qpi_key >> 16), + cfs_atomic_read(&pool->qpi_ref), + pool->qpi_least_qunit); + + + for (type = 0; type < MAXQUOTAS; type++) + i += snprintf(page + i, count - i, + " %s:\n" + " #slv: %d\n" + " #lqe: %d\n", + QTYPE_NAME(type), + pool->qpi_slv_nr[type], + cfs_atomic_read(&pool->qpi_site[type]->lqs_hash->hs_count)); + + return i; +} + +static struct lprocfs_vars lprocfs_quota_qpi_vars[] = { + { "info", lprocfs_qpi_rd_state, 0, 0}, + { NULL } +}; + +/* + * Allocate a new qmt_pool_info structure and add it to the pool hash table + * of the qmt. + * + * \param env - is the environment passed by the caller + * \param qmt - is the quota master target + * \param pool_id - is the 16-bit pool identifier of the new pool to add + * \param pool_type - is the resource type of this pool instance, either + * LQUOTA_RES_MD or LQUOTA_RES_DT. + * + * \retval - 0 on success, appropriate error on failure + */ +static int qmt_pool_alloc(const struct lu_env *env, struct qmt_device *qmt, + int pool_id, int pool_type) +{ + struct qmt_thread_info *qti = qmt_info(env); + struct qmt_pool_info *pool; + int rc = 0; + ENTRY; + + OBD_ALLOC_PTR(pool); + if (pool == NULL) + RETURN(-ENOMEM); + CFS_INIT_LIST_HEAD(&pool->qpi_linkage); + + /* assign key used by hash functions */ + pool->qpi_key = pool_id + (pool_type << 16); + + /* initialize refcount to 1, hash table will then grab an additional + * reference */ + atomic_set(&pool->qpi_ref, 1); + + /* set up least qunit size to use for this pool */ + pool->qpi_least_qunit = LQUOTA_LEAST_QUNIT(pool_type); + + /* create pool proc directory */ + sprintf(qti->qti_buf, "%s-0x%x", RES_NAME(pool_type), pool_id); + pool->qpi_proc = lprocfs_register(qti->qti_buf, qmt->qmt_proc, + lprocfs_quota_qpi_vars, pool); + if (IS_ERR(pool->qpi_proc)) { + rc = PTR_ERR(pool->qpi_proc); + CERROR("%s: failed to create proc entry for pool %s (%d)\n", + qmt->qmt_svname, qti->qti_buf, rc); + pool->qpi_proc = NULL; + GOTO(out, rc); + } + + /* grab reference on master target that this pool belongs to */ + lu_device_get(qmt2lu_dev(qmt)); + lu_ref_add(&qmt2lu_dev(qmt)->ld_reference, "pool", pool); + pool->qpi_qmt = qmt; + + /* add to qmt hash table */ + rc = cfs_hash_add_unique(qmt->qmt_pool_hash, &pool->qpi_key, + &pool->qpi_hash); + if (rc) { + CERROR("%s: failed to add pool %s to qmt hash (%d)\n", + qmt->qmt_svname, qti->qti_buf, rc); + GOTO(out, rc); + } + + /* add to qmt pool list */ + cfs_list_add_tail(&pool->qpi_linkage, &qmt->qmt_pool_list); + EXIT; +out: + if (rc) + /* this frees the pool structure since refcount is equal to 1 */ + qpi_putref(env, pool); + return rc; +} + +/* + * Delete a qmt_pool_info instance and all structures associated. + * + * \param env - is the environment passed by the caller + * \param pool - is the qmt_pool_info structure to free + */ +static void qmt_pool_free(const struct lu_env *env, struct qmt_pool_info *pool) +{ + int qtype; + ENTRY; + + /* release proc entry */ + if (pool->qpi_proc) { + lprocfs_remove(&pool->qpi_proc); + pool->qpi_proc = NULL; + } + + /* release per-quota type site used to manage quota entries as well as + * references to global index files */ + for (qtype = 0; qtype < MAXQUOTAS; qtype++) { + /* release site */ + if (pool->qpi_site[qtype] != NULL && + !IS_ERR(pool->qpi_site[qtype])) + lquota_site_free(env, pool->qpi_site[qtype]); + /* release reference to global index */ + if (pool->qpi_glb_obj[qtype] != NULL && + !IS_ERR(pool->qpi_glb_obj[qtype])) + lu_object_put(env, &pool->qpi_glb_obj[qtype]->do_lu); + } + + /* release reference on pool directory */ + if (pool->qpi_root != NULL && !IS_ERR(pool->qpi_root)) + lu_object_put(env, &pool->qpi_root->do_lu); + + /* release reference on the master target */ + if (pool->qpi_qmt != NULL) { + struct lu_device *ld = qmt2lu_dev(pool->qpi_qmt); + + lu_device_put(ld); + lu_ref_del(&ld->ld_reference, "pool", pool); + pool->qpi_qmt = NULL; + } + + LASSERT(cfs_list_empty(&pool->qpi_linkage)); + OBD_FREE_PTR(pool); +} + +/* + * Look-up a pool in the hash table based on the pool ID and type. + * + * \param env - is the environment passed by the caller + * \param qmt - is the quota master target + * \param pool_id - is the 16-bit identifier of the pool to look up + * \param pool_type - is the type of this pool, either LQUOTA_RES_MD or + * LQUOTA_RES_DT. + */ +static struct qmt_pool_info *qmt_pool_lookup(const struct lu_env *env, + struct qmt_device *qmt, + int pool_id, int pool_type) +{ + struct qmt_pool_info *pool; + __u32 key; + ENTRY; + + LASSERT(qmt->qmt_pool_hash != NULL); + + /* look-up pool in hash table */ + key = pool_id + (pool_type << 16); + pool = cfs_hash_lookup(qmt->qmt_pool_hash, (void *)&key); + if (pool == NULL) { + /* this qmt isn't managing this pool! */ + CERROR("%s: looking up quota entry for a pool (0x%x/%d) which " + "isn't managed by this quota master target\n", + qmt->qmt_svname, pool_id, pool_type); + RETURN(ERR_PTR(-ENOENT)); + } + RETURN(pool); +} + +/* + * Functions implementing the pool API, used by the qmt handlers + */ + +/* + * Destroy all pools which are still in the hash table and free the pool + * hash table. + * + * \param env - is the environment passed by the caller + * \param qmt - is the quota master target + * + */ +void qmt_pool_fini(const struct lu_env *env, struct qmt_device *qmt) +{ + struct qmt_pool_info *pool; + cfs_list_t *pos, *n; + ENTRY; + + if (qmt->qmt_pool_hash == NULL) + RETURN_EXIT; + + /* parse list of pool and destroy each element */ + cfs_list_for_each_safe(pos, n, &qmt->qmt_pool_list) { + pool = cfs_list_entry(pos, struct qmt_pool_info, + qpi_linkage); + /* remove from hash */ + cfs_hash_del(qmt->qmt_pool_hash, &pool->qpi_key, + &pool->qpi_hash); + + /* remove from list */ + cfs_list_del_init(&pool->qpi_linkage); + + /* release extra reference taken in qmt_pool_alloc */ + qpi_putref(env, pool); + } + LASSERT(cfs_list_empty(&qmt->qmt_pool_list)); + + cfs_hash_putref(qmt->qmt_pool_hash); + qmt->qmt_pool_hash = NULL; + EXIT; +} + +/* + * Initialize pool configure for the quota master target. For now, we only + * support the default data (i.e. all OSTs) and metadata (i.e. all the MDTs) + * pool which are instantiated in this function. + * + * \param env - is the environment passed by the caller + * \param qmt - is the quota master target for which we have to initializa the + * pool configuration + * + * \retval - 0 on success, appropriate error on failure + */ +int qmt_pool_init(const struct lu_env *env, struct qmt_device *qmt) +{ + int res, rc = 0; + ENTRY; + + /* initialize pool hash table */ + qmt->qmt_pool_hash = cfs_hash_create("POOL_HASH", + HASH_POOLS_CUR_BITS, + HASH_POOLS_MAX_BITS, + HASH_POOLS_BKT_BITS, 0, + CFS_HASH_MIN_THETA, + CFS_HASH_MAX_THETA, + &qpi_hash_ops, + CFS_HASH_DEFAULT); + if (qmt->qmt_pool_hash == NULL) { + CERROR("%s: failed to create pool hash table\n", + qmt->qmt_svname); + RETURN(-ENOMEM); + } + + /* initialize pool list */ + CFS_INIT_LIST_HEAD(&qmt->qmt_pool_list); + + /* Instantiate pool master for the default data and metadata pool (both + * have pool ID equals to 0). + * This code will have to be revisited once we support quota on + * non-default pools */ + for (res = LQUOTA_FIRST_RES; res < LQUOTA_LAST_RES; res++) { + rc = qmt_pool_alloc(env, qmt, 0, res); + if (rc) + break; + } + + if (rc) + qmt_pool_fini(env, qmt); + + RETURN(rc); +} + +static int qmt_slv_cnt(const struct lu_env *env, struct lu_fid *glb_fid, + char *slv_name, struct lu_fid *slv_fid, void *arg) +{ + int *nr = arg; + + /* one more slave */ + (*nr)++; + + return 0; +} + +/* + * Set up on-disk index files associated with each pool. + * + * \param env - is the environment passed by the caller + * \param qmt - is the quota master target for which we have to initializa the + * pool configuration + * \param qmt_root - is the on-disk directory created for the QMT. + * + * \retval - 0 on success, appropriate error on failure + */ +int qmt_pool_prepare(const struct lu_env *env, struct qmt_device *qmt, + struct dt_object *qmt_root) +{ + struct qmt_thread_info *qti = qmt_info(env); + struct qmt_pool_info *pool; + struct dt_device *dev = NULL; + cfs_list_t *pos; + int rc = 0, qtype; + ENTRY; + + LASSERT(qmt->qmt_pool_hash != NULL); + + /* iterate over each pool in the hash and allocate a quota site for each + * one. This involves creating a global index file on disk */ + cfs_list_for_each(pos, &qmt->qmt_pool_list) { + struct dt_object *obj; + int pool_type, pool_id; + + pool = cfs_list_entry(pos, struct qmt_pool_info, + qpi_linkage); + + pool_id = pool->qpi_key & 0x0000ffff; + pool_type = pool->qpi_key >> 16; + if (dev == NULL) + dev = pool->qpi_qmt->qmt_child; + + /* allocate directory for this pool */ + sprintf(qti->qti_buf, "%s-0x%x", RES_NAME(pool_type), pool_id); + obj = lquota_disk_dir_find_create(env, qmt->qmt_child, qmt_root, + qti->qti_buf); + if (IS_ERR(obj)) + RETURN(PTR_ERR(obj)); + pool->qpi_root = obj; + + for (qtype = 0; qtype < MAXQUOTAS; qtype++) { + /* Generating FID of global index in charge of storing + * settings for this quota type */ + lquota_generate_fid(&qti->qti_fid, pool_id, pool_type, + qtype); + + /* open/create the global index file for this quota + * type */ + obj = lquota_disk_glb_find_create(env, dev, + pool->qpi_root, + &qti->qti_fid, false); + if (IS_ERR(obj)) { + rc = PTR_ERR(obj); + CERROR("%s: failed to create glb index copy for" + " %s type (%d)\n", qmt->qmt_svname, + QTYPE_NAME(qtype), rc); + RETURN(rc); + } + + pool->qpi_glb_obj[qtype] = obj; + + /* create quota entry site for this quota type */ + pool->qpi_site[qtype] = lquota_site_alloc(env, pool, + true, qtype, + &qmt_lqe_ops); + if (IS_ERR(pool->qpi_site[qtype])) { + rc = PTR_ERR(pool->qpi_site[qtype]); + CERROR("%s: failed to create site for %s type " + "(%d)\n", qmt->qmt_svname, + QTYPE_NAME(qtype), rc); + RETURN(rc); + } + + /* count number of slaves which already connected to + * the master in the past */ + pool->qpi_slv_nr[qtype] = 0; + rc = lquota_disk_for_each_slv(env, pool->qpi_root, + &qti->qti_fid, + qmt_slv_cnt, + &pool->qpi_slv_nr[qtype]); + if (rc) { + CERROR("%s: failed to scan & count slave " + "indexes for %s type (%d)\n", + qmt->qmt_svname, QTYPE_NAME(qtype), rc); + RETURN(rc); + } +#ifdef LPROCFS + /* add procfs file to dump the global index, mostly for + * debugging purpose */ + sprintf(qti->qti_buf, "glb-%s", QTYPE_NAME(qtype)); + rc = lprocfs_seq_create(pool->qpi_proc, qti->qti_buf, + 0444, &lprocfs_quota_seq_fops, + obj); + if (rc) + CWARN("%s: Error adding procfs file for global" + "quota index "DFID", rc:%d\n", + qmt->qmt_svname, PFID(&qti->qti_fid), rc); +#endif + } + } + + RETURN(0); +} + +/* + * Handle new slave connection. Called when a slave enqueues the global quota + * lock at the beginning of the reintegration procedure. + * + * \param env - is the environment passed by the caller + * \parap qmt - is the quota master target handling this request + * \param glb_fid - is the fid of the global index file + * \param slv_fid - is the fid of the newly created slave index file + * \param slv_ver - is the current version of the slave index file + * \param uuid - is the uuid of slave which is (re)connecting to the master + * target + * + * \retval - 0 on success, appropriate error on failure + */ +int qmt_pool_new_conn(const struct lu_env *env, struct qmt_device *qmt, + struct lu_fid *glb_fid, struct lu_fid *slv_fid, + __u64 *slv_ver, struct obd_uuid *uuid) +{ + struct qmt_pool_info *pool; + struct dt_object *slv_obj; + int pool_id, pool_type, qtype; + bool created = false; + int rc = 0; + + /* extract pool info from global index FID */ + rc = lquota_extract_fid(glb_fid, &pool_id, &pool_type, &qtype); + if (rc) + RETURN(rc); + + /* look-up pool in charge of this global index FID */ + pool = qmt_pool_lookup(env, qmt, pool_id, pool_type); + if (IS_ERR(pool)) + RETURN(PTR_ERR(pool)); + + /* look-up slave index file */ + slv_obj = lquota_disk_slv_find(env, qmt->qmt_child, pool->qpi_root, + glb_fid, uuid); + if (IS_ERR(slv_obj) && PTR_ERR(slv_obj) == -ENOENT) { + /* create slave index file */ + slv_obj = lquota_disk_slv_find_create(env, qmt->qmt_child, + pool->qpi_root, glb_fid, + uuid, false); + created = true; + } + if (IS_ERR(slv_obj)) { + rc = PTR_ERR(slv_obj); + CERROR("%s: failed to create quota slave index file for %s (%d)" + "\n", qmt->qmt_svname, obd_uuid2str(uuid), rc); + GOTO(out, rc); + } + + /* retrieve slave fid & current object version */ + memcpy(slv_fid, lu_object_fid(&slv_obj->do_lu), sizeof(*slv_fid)); + *slv_ver = dt_version_get(env, slv_obj); + lu_object_put(env, &slv_obj->do_lu); + if (created) + pool->qpi_slv_nr[qtype]++; +out: + qpi_putref(env, pool); + RETURN(rc); +} + +/* + * Look-up a lquota_entry in the pool hash and allocate it if not found. + * + * \param env - is the environment passed by the caller + * \param qmt - is the quota master target for which we have to initializa the + * pool configuration + * \param pool_id - is the 16-bit identifier of the pool + * \param pool_type - is the pool type, either LQUOTA_RES_MD or LQUOTA_RES_DT. + * \param qtype - is the quota type, either user or group. + * \param qid - is the quota ID to look-up + * + * \retval - valid pointer to lquota entry on success, appropriate error on + * failure + */ +struct lquota_entry *qmt_pool_lqe_lookup(const struct lu_env *env, + struct qmt_device *qmt, + int pool_id, int pool_type, + int qtype, union lquota_id *qid) +{ + struct qmt_pool_info *pool; + struct lquota_entry *lqe; + ENTRY; + + /* look-up pool responsible for this global index FID */ + pool = qmt_pool_lookup(env, qmt, pool_id, pool_type); + if (IS_ERR(pool)) + RETURN((void *)pool); + + /* now that we have the pool, let's look-up the quota entry in the + * right quota site */ + lqe = lqe_locate(env, pool->qpi_site[qtype], qid); + + qpi_putref(env, pool); + RETURN(lqe); +}