From b2cb6fd1095f9c483b7bc1ebbbfdaef719aea87c Mon Sep 17 00:00:00 2001 From: Johann Lombardi Date: Thu, 4 Oct 2012 16:18:26 +0200 Subject: [PATCH] LU-1842 quota: add core QSD code Add acquire/release logic on the qsd side as well as reintegration which is handled in 3 steps: - enqueue global index lock - fetch global index and update local enforce list - fetch slave index and update local copy - adjust/release quota space as appropriate This patch also fixes the ofd code to call ->ldo_recovery_complete across the stack. Signed-off-by: Johann Lombardi Change-Id: Ib4a8910fe34de878159827c76b40a8f899aaad10 Signed-off-by: Niu Yawei Reviewed-on: http://review.whamcloud.com/4182 Tested-by: Hudson Reviewed-by: Alex Zhuravlev Reviewed-by: Jinshan Xiong Tested-by: Maloo Reviewed-by: Niu Yawei Reviewed-by: Oleg Drokin --- lustre/include/lquota.h | 50 +- lustre/ofd/ofd_dev.c | 27 +- lustre/ofd/ofd_internal.h | 1 + lustre/ofd/ofd_obd.c | 11 +- lustre/osd-ldiskfs/osd_handler.c | 28 +- lustre/osd-zfs/osd_handler.c | 11 +- lustre/osp/osp_dev.c | 4 + lustre/quota/Makefile.in | 14 +- lustre/quota/lquota_internal.h | 3 + lustre/quota/qsd_config.c | 33 +- lustre/quota/qsd_handler.c | 956 +++++++++++++++++++++++++++++++++++++++ lustre/quota/qsd_internal.h | 88 +++- lustre/quota/qsd_lib.c | 286 +++++++++++- lustre/quota/qsd_lock.c | 16 +- lustre/quota/qsd_reint.c | 532 ++++++++++++++++++++++ lustre/quota/qsd_request.c | 1 + lustre/quota/qsd_writeback.c | 483 ++++++++++++++++++++ 17 files changed, 2461 insertions(+), 83 deletions(-) create mode 100644 lustre/quota/qsd_handler.c diff --git a/lustre/include/lquota.h b/lustre/include/lquota.h index 7af5baa..043daf1 100644 --- a/lustre/include/lquota.h +++ b/lustre/include/lquota.h @@ -122,36 +122,48 @@ struct qsd_instance; * needed. qsd_prepare() should typically be called when * ->ldo_prepare is invoked. * + * - qsd_start(): a qsd instance should be started once recovery is completed + * (i.e. when ->ldo_recovery_complete is called). This is used + * to notify the qsd layer that quota should now be enforced + * again via the qsd_op_begin/end functions. The last step of the + * reintegration prodecure (namely usage reconciliation) will be + * completed during start. + * * - qsd_fini(): is used to release a qsd_instance structure allocated with * qsd_init(). This releases all quota slave objects and frees the * structures associated with the qsd_instance. * + * - qsd_op_begin(): is used to enforce quota, it must be called in the + * declaration of each operation. qsd_op_end() should then be + * invoked later once all operations have been completed in + * order to release/adjust the quota space. + * Running qsd_op_begin() before qsd_start() isn't fatal and + * will return success. + * Once qsd_start() has been run, qsd_op_begin() will block + * until the reintegration procedure is completed. + * + * - qsd_op_end(): performs the post operation quota processing. This must be + * called after the operation transaction stopped. + * While qsd_op_begin() must be invoked each time a new + * operation is declared, qsd_op_end() should be called only + * once for the whole transaction. + * + * - qsd_adjust_quota(): triggers pre-acquire/release if necessary. + * * Below are the function prototypes to be used by OSD layer to manage quota * enforcement. Arguments are documented where each function is defined. */ struct qsd_instance *qsd_init(const struct lu_env *, char *, struct dt_device *, cfs_proc_dir_entry_t *); - int qsd_prepare(const struct lu_env *, struct qsd_instance *); - +int qsd_start(const struct lu_env *, struct qsd_instance *); void qsd_fini(const struct lu_env *, struct qsd_instance *); - -/* XXX: dummy qsd_op_begin() & qsd_op_end(), will be replaced with the real - * one once all the enforcement code landed. */ -static inline int qsd_op_begin(const struct lu_env *env, - struct qsd_instance *qsd, - struct lquota_trans *trans, - struct lquota_id_info *qi, - int *flags) -{ - return 0; -} - -static inline void qsd_op_end(const struct lu_env *env, - struct qsd_instance *qsd, - struct lquota_trans *trans) -{ -} +int qsd_op_begin(const struct lu_env *, struct qsd_instance *, + struct lquota_trans *, struct lquota_id_info *, int *); +void qsd_op_end(const struct lu_env *, struct qsd_instance *, + struct lquota_trans *); +void qsd_adjust_quota(const struct lu_env *, struct qsd_instance *, + union lquota_id *, int); /* * Quota information attached to a transaction diff --git a/lustre/ofd/ofd_dev.c b/lustre/ofd/ofd_dev.c index a637f03..302f435 100644 --- a/lustre/ofd/ofd_dev.c +++ b/lustre/ofd/ofd_dev.c @@ -293,17 +293,35 @@ static struct lu_object *ofd_object_alloc(const struct lu_env *env, extern int ost_handle(struct ptlrpc_request *req); -static int ofd_start(const struct lu_env *env, struct lu_device *dev) +static int ofd_prepare(const struct lu_env *env, struct lu_device *pdev, + struct lu_device *dev) { + struct ofd_thread_info *info; struct ofd_device *ofd = ofd_dev(dev); + struct obd_device *obd = ofd_obd(ofd); struct lu_device *next = &ofd->ofd_osd->dd_lu_dev; int rc; ENTRY; + rc = lu_env_refill((struct lu_env *)env); + if (rc != 0) { + CERROR("Failure to refill session: '%d'\n", rc); + RETURN(rc); + } + + info = ofd_info_init(env, NULL); + if (info == NULL) + RETURN(-EFAULT); + /* initialize lower device */ rc = next->ld_ops->ldo_prepare(env, dev, next); + target_recovery_init(&ofd->ofd_lut, ost_handle); + + if (obd->obd_recovering == 0) + ofd_postrecov(env, ofd); + RETURN(rc); } @@ -331,6 +349,7 @@ static struct lu_device_operations ofd_lu_ops = { .ldo_object_alloc = ofd_object_alloc, .ldo_process_config = ofd_process_config, .ldo_recovery_complete = ofd_recovery_complete, + .ldo_prepare = ofd_prepare, }; static int ofd_procfs_init(struct ofd_device *ofd) @@ -546,10 +565,6 @@ static int ofd_init0(const struct lu_env *env, struct ofd_device *m, m->ofd_grant_ratio = ofd_grant_ratio_conv(m->ofd_dt_conf.ddp_grant_reserved); - rc = ofd_start(env, &m->ofd_dt_dev.dd_lu_dev); - if (rc) - GOTO(err_fini_stack, rc); - rc = lut_init(env, &m->ofd_lut, obd, m->ofd_osd); if (rc) GOTO(err_free_ns, rc); @@ -558,8 +573,6 @@ static int ofd_init0(const struct lu_env *env, struct ofd_device *m, if (rc) GOTO(err_fini_lut, rc); - target_recovery_init(&m->ofd_lut, ost_handle); - RETURN(0); err_fini_lut: lut_fini(env, &m->ofd_lut); diff --git a/lustre/ofd/ofd_internal.h b/lustre/ofd/ofd_internal.h index e130a08..cc5fe41 100644 --- a/lustre/ofd/ofd_internal.h +++ b/lustre/ofd/ofd_internal.h @@ -308,6 +308,7 @@ void ofd_free_capa_keys(struct ofd_device *ofd); /* ofd_dev.c */ extern struct lu_context_key ofd_thread_key; +int ofd_postrecov(const struct lu_env *env, struct ofd_device *ofd); /* ofd_obd.c */ extern struct obd_ops ofd_obd_ops; diff --git a/lustre/ofd/ofd_obd.c b/lustre/ofd/ofd_obd.c index 556eb4b..c44524d 100644 --- a/lustre/ofd/ofd_obd.c +++ b/lustre/ofd/ofd_obd.c @@ -425,6 +425,14 @@ static int ofd_destroy_export(struct obd_export *exp) return 0; } +int ofd_postrecov(const struct lu_env *env, struct ofd_device *ofd) +{ + struct lu_device *ldev = &ofd->ofd_dt_dev.dd_lu_dev; + + CDEBUG(D_HA, "%s: recovery is over\n", ofd_obd(ofd)->obd_name); + return ldev->ld_ops->ldo_recovery_complete(env, ldev); +} + int ofd_obd_postrecov(struct obd_device *obd) { struct lu_env env; @@ -438,7 +446,8 @@ int ofd_obd_postrecov(struct obd_device *obd) RETURN(rc); ofd_info_init(&env, obd->obd_self_export); - rc = ldev->ld_ops->ldo_recovery_complete(&env, ldev); + rc = ofd_postrecov(&env, ofd_dev(ldev)); + lu_env_fini(&env); RETURN(rc); } diff --git a/lustre/osd-ldiskfs/osd_handler.c b/lustre/osd-ldiskfs/osd_handler.c index e5797c3..66600be 100644 --- a/lustre/osd-ldiskfs/osd_handler.c +++ b/lustre/osd-ldiskfs/osd_handler.c @@ -837,8 +837,24 @@ static void osd_object_delete(const struct lu_env *env, struct lu_object *l) osd_index_fini(obj); if (inode != NULL) { + struct qsd_instance *qsd = osd_obj2dev(obj)->od_quota_slave; + qid_t uid = inode->i_uid; + qid_t gid = inode->i_gid; + iput(inode); obj->oo_inode = NULL; + + if (qsd != NULL) { + struct osd_thread_info *info = osd_oti_get(env); + struct lquota_id_info *qi = &info->oti_qi; + + /* Release granted quota to master if necessary */ + qi->lqi_id.qid_uid = uid; + qsd_adjust_quota(env, qsd, &qi->lqi_id, USRQUOTA); + + qi->lqi_id.qid_uid = gid; + qsd_adjust_quota(env, qsd, &qi->lqi_id, GRPQUOTA); + } } } @@ -4624,7 +4640,17 @@ static int osd_process_config(const struct lu_env *env, static int osd_recovery_complete(const struct lu_env *env, struct lu_device *d) { - RETURN(0); + struct osd_device *osd = osd_dev(d); + int rc = 0; + ENTRY; + + if (osd->od_quota_slave == NULL) + RETURN(0); + + /* start qsd instance on recovery completion, this notifies the quota + * slave code that we are about to process new requests now */ + rc = qsd_start(env, osd->od_quota_slave); + RETURN(rc); } /* diff --git a/lustre/osd-zfs/osd_handler.c b/lustre/osd-zfs/osd_handler.c index 9f0c539..c107ff0 100644 --- a/lustre/osd-zfs/osd_handler.c +++ b/lustre/osd-zfs/osd_handler.c @@ -754,8 +754,17 @@ static int osd_process_config(const struct lu_env *env, static int osd_recovery_complete(const struct lu_env *env, struct lu_device *d) { + struct osd_device *osd = osd_dev(d); + int rc = 0; ENTRY; - RETURN(0); + + if (osd->od_quota_slave == NULL) + RETURN(0); + + /* start qsd instance on recovery completion, this notifies the quota + * slave code that we are about to process new requests now */ + rc = qsd_start(env, osd->od_quota_slave); + RETURN(rc); } /* diff --git a/lustre/osp/osp_dev.c b/lustre/osp/osp_dev.c index 78d4943..e99b663 100644 --- a/lustre/osp/osp_dev.c +++ b/lustre/osp/osp_dev.c @@ -713,6 +713,10 @@ static int osp_obd_connect(const struct lu_env *env, struct obd_export **exp, OBD_CONNECT_SKIP_ORPHAN | OBD_CONNECT_VERSION | OBD_CONNECT_FID; + + if (is_osp_on_ost(osp->opd_obd->obd_name)) + ocd->ocd_connect_flags |= OBD_CONNECT_LIGHTWEIGHT; + ocd->ocd_version = LUSTRE_VERSION_CODE; LASSERT(data->ocd_connect_flags & OBD_CONNECT_INDEX); ocd->ocd_index = data->ocd_index; diff --git a/lustre/quota/Makefile.in b/lustre/quota/Makefile.in index 7b61887..4097c3c 100644 --- a/lustre/quota/Makefile.in +++ b/lustre/quota/Makefile.in @@ -1,17 +1,15 @@ MODULES := lquota -qmt-objs := qmt_dev.o qmt_handler.o qmt_lock.o qmt_entry.o qmt_pool.o - quota-objs := lproc_quota.o lquota_lib.o lquota_disk.o lquota_entry.o -qsd-objs := qsd_lib.o qsd_request.o qsd_entry.o qsd_lock.o -qsd-objs += qsd_reint.o qsd_writeback.o qsd_config.o +qmt-objs := qmt_dev.o qmt_handler.o qmt_lock.o qmt_entry.o qmt_pool.o -lquota-objs := $(quota-objs) $(qsd-objs) +qsd-objs := qsd_lib.o qsd_request.o qsd_entry.o qsd_lock.o +qsd-objs += qsd_reint.o qsd_writeback.o qsd_config.o qsd_handler.o -lquota-objs += $(qmt-objs) +lquota-objs := $(quota-objs) $(qsd-objs) $(qmt-objs) -EXTRA_DIST := $(lquota-objs:%.o=%.c) lquota_internal.h qsd_internal.h -EXTRA_DIST += qmt_internal.h +EXTRA_DIST := $(lquota-objs:%.o=%.c) +EXTRA_DIST += lquota_internal.h qsd_internal.h qmt_internal.h @INCLUDE_RULES@ diff --git a/lustre/quota/lquota_internal.h b/lustre/quota/lquota_internal.h index 7b2bc57..6bd6924 100644 --- a/lustre/quota/lquota_internal.h +++ b/lustre/quota/lquota_internal.h @@ -265,6 +265,9 @@ static inline void lqe_read_unlock(struct lquota_entry *lqe) #define LQUOTA_LEAST_QUNIT(type) \ (type == LQUOTA_RES_MD ? (1 << 10) : toqb(PTLRPC_MAX_BRW_SIZE)) +#define LQUOTA_OVER_FL(type) \ + (type == USRQUOTA ? QUOTA_FL_OVER_USRQUOTA : QUOTA_FL_OVER_GRPQUOTA) + /* Common data shared by quota-level handlers. This is allocated per-thread to * reduce stack consumption */ struct lquota_thread_info { diff --git a/lustre/quota/qsd_config.c b/lustre/quota/qsd_config.c index 26c06bb..0ad0d64 100644 --- a/lustre/quota/qsd_config.c +++ b/lustre/quota/qsd_config.c @@ -136,6 +136,7 @@ int qsd_process_config(struct lustre_cfg *lcfg) char *cfgstr = lustre_cfg_string(lcfg, 1); char *keystr, *valstr; int rc, pool, enabled = 0; + bool reint = false; ENTRY; CDEBUG(D_QUOTA, "processing quota parameter: fs:%s cfgstr:%s\n", fsname, @@ -153,7 +154,7 @@ int qsd_process_config(struct lustre_cfg *lcfg) qfs = qsd_get_fsinfo(fsname, 0); if (qfs == NULL) { - CERROR("Fail to find quota filesystem information for %s\n", + CERROR("failed to find quota filesystem information for %s\n", fsname); RETURN(-ENOENT); } @@ -163,12 +164,42 @@ int qsd_process_config(struct lustre_cfg *lcfg) if (strchr(valstr, 'g')) enabled |= 1 << GRPQUOTA; + cfs_down(&qfs->qfs_sem); if (qfs->qfs_enabled[pool - LQUOTA_FIRST_RES] == enabled) /* no change required */ GOTO(out, rc = 0); + if ((qfs->qfs_enabled[pool - LQUOTA_FIRST_RES] & enabled) != enabled) + reint = true; + qfs->qfs_enabled[pool - LQUOTA_FIRST_RES] = enabled; + + /* trigger reintegration for all qsd */ + if (reint) { + struct qsd_instance *qsd; + struct qsd_qtype_info *qqi; + + cfs_list_for_each_entry(qsd, &qfs->qfs_qsd_list, qsd_link) { + bool skip = false; + int type; + + /* start reintegration only if qsd_prepare() was + * successfully called */ + cfs_read_lock(&qsd->qsd_lock); + if (!qsd->qsd_prepared) + skip = true; + cfs_read_unlock(&qsd->qsd_lock); + if (skip) + continue; + + for (type = USRQUOTA; type < MAXQUOTAS; type++) { + qqi = qsd->qsd_type_array[type]; + qsd_start_reint_thread(qqi); + } + } + } out: + cfs_up(&qfs->qfs_sem); qsd_put_fsinfo(qfs); RETURN(0); } diff --git a/lustre/quota/qsd_handler.c b/lustre/quota/qsd_handler.c new file mode 100644 index 0000000..b1ec4a0 --- /dev/null +++ b/lustre/quota/qsd_handler.c @@ -0,0 +1,956 @@ +/* + * GPL HEADER START + * + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 only, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License version 2 for more details (a copy is included + * in the LICENSE file that accompanied this code). + * + * You should have received a copy of the GNU General Public License + * version 2 along with this program; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 021110-1307, USA + * + * GPL HEADER END + */ +/* + * Copyright (c) 2012 Intel, Inc. + * Use is subject to license terms. + * + * Author: Johann Lombardi + * Author: Niu Yawei + */ + +#ifndef EXPORT_SYMTAB +# define EXPORT_SYMTAB +#endif + +#define DEBUG_SUBSYSTEM S_LQUOTA + +#include +#include +#include +#include +#include + +#include +#include +#include + +#include "qsd_internal.h" + +/* + * helper function returning how much space is currently reserved for requests + * in flight. + */ +static inline int lqe_pending_dqacq(struct lquota_entry *lqe) +{ + int pending; + + lqe_read_lock(lqe); + pending = lqe->lqe_pending_req; + lqe_read_unlock(lqe); + + return pending; +} + +/* + * helper function returning true when the connection to master is ready to be + * used. + */ +static inline int qsd_ready(struct qsd_instance *qsd) +{ + struct obd_import *imp = NULL; + + cfs_read_lock(&qsd->qsd_lock); + if (qsd->qsd_exp_valid) + imp = class_exp2cliimp(qsd->qsd_exp); + cfs_read_unlock(&qsd->qsd_lock); + + return (imp == NULL || imp->imp_invalid) ? false : true; +} + +/* + * Helper function returning true when quota space need to be adjusted (some + * unused space should be free or pre-acquire) and false otherwise. + */ +static bool qsd_adjust_needed(struct lquota_entry *lqe) +{ + struct qsd_qtype_info *qqi; + __u64 usage, granted; + + qqi = lqe2qqi(lqe); + + if (!lqe->lqe_enforced || qqi->qqi_qsd->qsd_stopping) + /* if quota isn't enforced for this id, no need to adjust + * Similarly, no need to perform adjustment if the target is in + * the process of shutting down. */ + return false; + + usage = lqe->lqe_usage; + usage += lqe->lqe_pending_write + lqe->lqe_waiting_write; + granted = lqe->lqe_granted - lqe->lqe_pending_rel; + + /* need to re-acquire per-ID lock or release all grant */ + if (!lustre_handle_is_used(&lqe->lqe_lockh) && + lqe->lqe_granted > lqe->lqe_usage) + return true; + + /* good old quota qunit adjustment logic which has been around since + * lustre 1.4: + * 1. Need to release some space? */ + if (granted > usage + lqe->lqe_qunit) + return true; + + /* 2. Any quota overrun? */ + if (lqe->lqe_usage > lqe->lqe_granted) + /* we ended up consuming more than we own, we need to have this + * fixed ASAP */ + return true; + + /* 3. Time to pre-acquire? */ + if (!lqe->lqe_edquot && !lqe->lqe_nopreacq && lqe->lqe_qunit != 0 && + granted < usage + lqe->lqe_qtune) + /* need to pre-acquire some space if we don't want to block + * client's requests */ + return true; + + return false; +} + +/* + * Callback function called when an acquire/release request sent to the master + * is completed + */ +static void qsd_dqacq_completion(const struct lu_env *env, + struct qsd_qtype_info *qqi, + struct quota_body *reqbody, + struct quota_body *repbody, + struct lustre_handle *lockh, + union ldlm_wire_lvb *lvb, + void *arg, int ret) +{ + struct lquota_entry *lqe = (struct lquota_entry *)arg; + struct qsd_thread_info *qti; + int rc; + bool adjust = false, cancel = false; + ENTRY; + + LASSERT(qqi != NULL && lqe != NULL); + + /* environment passed by ptlrpcd is mostly used by CLIO and hasn't the + * DT tags set. */ + rc = lu_env_refill_by_tags((struct lu_env *)env, LCT_DT_THREAD, 0); + if (rc) { + LQUOTA_ERROR(lqe, "failed to refill environmnent %d", rc); + lqe_write_lock(lqe); + /* can't afford to adjust quota space with no suitable lu_env */ + GOTO(out_noadjust, rc); + } + qti = qsd_info(env); + + lqe_write_lock(lqe); + + if (ret != 0 && ret != -EDQUOT && ret != -EINPROGRESS) { + LQUOTA_ERROR(lqe, "DQACQ failed with %d, op:%x", ret, + reqbody->qb_flags); + GOTO(out, ret); + } + + /* despite -EDQUOT & -EINPROGRESS errors, the master might still + * grant us back quota space to adjust quota overrun */ + + LQUOTA_DEBUG(lqe, "DQACQ returned %d", ret); + + /* Set the lqe_lockh */ + if (lustre_handle_is_used(lockh) && + !lustre_handle_equal(lockh, &lqe->lqe_lockh)) + lustre_handle_copy(&lqe->lqe_lockh, lockh); + + /* If the replied qb_count is zero, it means master didn't process + * the DQACQ since the limit for this ID has been removed, so we + * should not update quota entry & slave index copy neither. */ + if (repbody != NULL && repbody->qb_count != 0) { + LQUOTA_DEBUG(lqe, "DQACQ qb_count:"LPU64, repbody->qb_count); + + if (req_is_rel(reqbody->qb_flags)) { + if (lqe->lqe_granted < repbody->qb_count) { + LQUOTA_ERROR(lqe, "can't release more space " + "than owned "LPU64"<"LPU64, + lqe->lqe_granted, + repbody->qb_count); + lqe->lqe_granted = 0; + } else { + lqe->lqe_granted -= repbody->qb_count; + } + /* Cancel the per-ID lock initiatively when there + * isn't any usage & grant, which can avoid master + * sending glimpse unnecessarily to this slave on + * quota revoking */ + if (!lqe->lqe_pending_write && !lqe->lqe_granted && + !lqe->lqe_waiting_write && !lqe->lqe_usage) + cancel = true; + } else { + lqe->lqe_granted += repbody->qb_count; + } + qti->qti_rec.lqr_slv_rec.qsr_granted = lqe->lqe_granted; + lqe_write_unlock(lqe); + + /* Update the slave index file in the dedicated thread. So far, + * We don't update the version of slave index copy on DQACQ. + * No locking is necessary since nobody can change + * lqe->lqe_granted while lqe->lqe_pending_req > 0 */ + qsd_upd_schedule(qqi, lqe, &lqe->lqe_id, &qti->qti_rec, 0, + false); + lqe_write_lock(lqe); + } + + /* extract information from lvb */ + if (ret == 0 && lvb != 0) { + if (lvb->l_lquota.lvb_id_qunit != 0) + qsd_set_qunit(lqe, lvb->l_lquota.lvb_id_qunit); + if (lvb->l_lquota.lvb_flags & LQUOTA_FL_EDQUOT) + lqe->lqe_edquot = true; + else + lqe->lqe_edquot = false; + } else if (repbody != NULL && repbody->qb_qunit != 0) { + qsd_set_qunit(lqe, repbody->qb_qunit); + } + + /* turn off pre-acquire if it failed with -EDQUOT. This is done to avoid + * flooding the master with acquire request. Pre-acquire will be turned + * on again as soon as qunit is modified */ + if (req_is_preacq(reqbody->qb_flags) && ret == -EDQUOT) + lqe->lqe_nopreacq = true; +out: + adjust = qsd_adjust_needed(lqe); +out_noadjust: + lqe->lqe_pending_req--; + lqe->lqe_pending_rel = 0; + lqe_write_unlock(lqe); + + cfs_waitq_broadcast(&lqe->lqe_waiters); + + /* release reference on per-ID lock */ + if (lustre_handle_is_used(lockh)) + ldlm_lock_decref(lockh, qsd_id_einfo.ei_mode); + + if (cancel) { + qsd_adjust_schedule(lqe, false, true); + } else if (adjust) { + if (!ret || ret == -EDQUOT) + qsd_adjust_schedule(lqe, false, false); + else + qsd_adjust_schedule(lqe, true, false); + } + + if (lvb) + /* free lvb allocated in qsd_dqacq */ + OBD_FREE_PTR(lvb); + + lqe_putref(lqe); + EXIT; +} + +static int qsd_acquire_local(struct lquota_entry *lqe, __u64 space) +{ + __u64 usage; + int rc; + ENTRY; + + if (!lqe->lqe_enforced) + /* not enforced any more, we are good */ + RETURN(0); + + lqe_write_lock(lqe); + /* use latest usage */ + usage = lqe->lqe_usage; + /* take pending write into account */ + usage += lqe->lqe_pending_write; + + if (space + usage <= lqe->lqe_granted - lqe->lqe_pending_rel) { + /* Yay! we got enough space */ + lqe->lqe_pending_write += space; + lqe->lqe_waiting_write -= space; + rc = 0; + } else if (lqe->lqe_edquot) { + rc = -EDQUOT; + } else { + rc = -EAGAIN; + } + lqe_write_unlock(lqe); + + RETURN(rc); +} + +static bool qsd_calc_space(struct lquota_entry *lqe, enum qsd_ops op, + struct quota_body *qbody) +{ + struct qsd_qtype_info *qqi; + __u64 usage, granted; + + if (!lqe->lqe_enforced && op != QSD_REL) + return 0; + + qqi = lqe2qqi(lqe); + + LASSERT(lqe->lqe_pending_rel == 0); + usage = lqe->lqe_usage; + usage += lqe->lqe_pending_write + lqe->lqe_waiting_write; + granted = lqe->lqe_granted; + + qbody->qb_flags = 0; +again: + switch (op) { + case QSD_ACQ: + /* if we overconsumed quota space, we report usage in request + * so that master can adjust it unconditionally */ + if (lqe->lqe_usage > lqe->lqe_granted) { + qbody->qb_usage = lqe->lqe_usage; + qbody->qb_flags = QUOTA_DQACQ_FL_REPORT; + granted = lqe->lqe_usage; + } + /* acquire as much as needed, but not more */ + if (usage > granted) { + qbody->qb_count = usage - granted; + qbody->qb_flags |= QUOTA_DQACQ_FL_ACQ; + } + break; + case QSD_REP: + /* When reporting quota (during reintegration or on setquota + * glimpse), we should release granted space if usage is 0. + * Otherwise, if the usage is less than granted, we need to + * acquire the per-ID lock to make sure the unused grant can be + * reclaimed by per-ID lock glimpse. */ + if (lqe->lqe_usage == 0 && lqe->lqe_granted != 0) { + LQUOTA_DEBUG(lqe, "Release on report!"); + GOTO(again, op = QSD_REL); + } else if (lqe->lqe_usage == lqe->lqe_granted) { + LQUOTA_DEBUG(lqe, "Usage matches granted, needn't do " + "anything on report!"); + } else if (lqe->lqe_usage < lqe->lqe_granted) { + LQUOTA_DEBUG(lqe, "Acquire per-ID lock on report!"); + qbody->qb_count = 0; + qbody->qb_flags = QUOTA_DQACQ_FL_ACQ; + } else { + LASSERT(lqe->lqe_usage > lqe->lqe_granted); + LQUOTA_DEBUG(lqe, "Reporting usage"); + qbody->qb_usage = lqe->lqe_usage; + qbody->qb_flags = QUOTA_DQACQ_FL_REPORT; + } + break; + case QSD_REL: + /* release unused quota space unconditionally */ + if (lqe->lqe_granted > lqe->lqe_usage) { + qbody->qb_count = lqe->lqe_granted - lqe->lqe_usage; + qbody->qb_flags = QUOTA_DQACQ_FL_REL; + } + break; + case QSD_ADJ: { + /* need to re-acquire per-ID lock or release all grant */ + if (!lustre_handle_is_used(&lqe->lqe_lockh) && + lqe->lqe_granted > lqe->lqe_usage) + GOTO(again, op = QSD_REP); + + /* release spare grant */ + if (granted > usage + lqe->lqe_qunit) { + /* pre-release quota space */ + qbody->qb_count = granted - usage; + /* if usage == 0, release all granted space */ + if (usage) { + /* try to keep one qunit of quota space */ + qbody->qb_count -= lqe->lqe_qunit; + /* but don't release less than qtune to avoid + * releasing space too often */ + if (qbody->qb_count < lqe->lqe_qtune) + qbody->qb_count = lqe->lqe_qtune; + } + qbody->qb_flags = QUOTA_DQACQ_FL_REL; + break; + } + + /* if we overconsumed quota space, we report usage in request + * so that master can adjust it unconditionally */ + if (lqe->lqe_usage > lqe->lqe_granted) { + qbody->qb_usage = lqe->lqe_usage; + qbody->qb_flags = QUOTA_DQACQ_FL_REPORT; + granted = lqe->lqe_usage; + } + + if (!lqe->lqe_edquot && !lqe->lqe_nopreacq && + lustre_handle_is_used(&lqe->lqe_lockh) && + lqe->lqe_qunit != 0 && granted < usage + lqe->lqe_qtune) { + /* To pre-acquire quota space, we report how much spare + * quota space the slave currently owns, then the master + * will grant us back how much we can pretend given the + * current state of affairs */ + if (granted <= usage) + qbody->qb_count = 0; + else + qbody->qb_count = granted - usage; + qbody->qb_flags |= QUOTA_DQACQ_FL_PREACQ; + } + break; + } + default: + CERROR("Invalid qsd operation:%u\n", op); + LBUG(); + break; + } + return qbody->qb_flags != 0; +} + +/* + * Acquire/release quota space from master. + * There are at most 1 in-flight dqacq/dqrel. + * + * \param env - the environment passed by the caller + * \param lqe - is the qid entry to be processed + * \param op - operation that want to be performed by the caller + * + * \retval 0 - success + * \retval -EDQUOT : out of quota + * -EINPROGRESS : inform client to retry write/create + * -ve : other appropriate errors + */ +int qsd_dqacq(const struct lu_env *env, struct lquota_entry *lqe, + enum qsd_ops op) +{ + struct qsd_thread_info *qti = qsd_info(env); + struct quota_body *qbody = &qti->qti_body; + struct qsd_instance *qsd; + struct qsd_qtype_info *qqi; + struct ldlm_lock *lock; + int rc; + bool intent = false, sync; + ENTRY; + + qqi = lqe2qqi(lqe); + qsd = qqi->qqi_qsd; + + if (qsd->qsd_stopping) { + LQUOTA_DEBUG(lqe, "Dropping quota req since qsd is stopping"); + /* Target is about to shut down, client will retry */ + RETURN(-EINPROGRESS); + } + + if (!qsd_ready(qsd)) { + LQUOTA_DEBUG(lqe, "Connection to master not ready"); + RETURN(-ENOTCONN); + } + + /* In most case, reintegration must have been triggered (when enable + * quota or on OST start), however, in rare race condition (enabling + * quota when starting OSTs), we might miss triggering reintegration + * for some qqi. + * + * If the previous reintegration failed for some reason, we'll + * re-trigger it here as well. */ + if (!qqi->qqi_glb_uptodate || !qqi->qqi_slv_uptodate) { + LQUOTA_DEBUG(lqe, "Not up-to-date, dropping request and kicking" + " off reintegration"); + qsd_start_reint_thread(qqi); + RETURN(-EINPROGRESS); + } + + LQUOTA_DEBUG(lqe, "DQACQ starts op=%u", op); + + /* Fill the remote global lock handle, master will check this handle + * to see if the slave is sending request with stale lock */ + cfs_read_lock(&qsd->qsd_lock); + lustre_handle_copy(&qbody->qb_glb_lockh, &qqi->qqi_lockh); + cfs_read_unlock(&qsd->qsd_lock); + + if (!lustre_handle_is_used(&qbody->qb_glb_lockh)) + RETURN(-ENOLCK); + + lock = ldlm_handle2lock(&qbody->qb_glb_lockh); + if (lock == NULL) + RETURN(-ENOLCK); + lustre_handle_copy(&qbody->qb_glb_lockh, &lock->l_remote_handle); + LDLM_LOCK_PUT(lock); + + /* We allow only one in-flight dqacq/dqrel for specified qid, if + * there is already in-flight dqacq/dqrel: + * + * - For QSD_ADJ: we should just abort it, since local limit is going + * to be changed soon; + * - For QSD_ACQ & QSD_REL: we just wait for the in-flight dqacq/dqrel + * finished, and return success to the caller. The caller is + * responsible for retrying; + * - For QSD_REP: we should just abort it, since slave has already + * acquired/released grant; */ + sync = (op == QSD_ACQ || op == QSD_REL) ? true : false; + LASSERTF(lqe->lqe_pending_req <= 1, "pending dqacq/dqrel:%d", + lqe->lqe_pending_req); + + lqe_write_lock(lqe); + if (lqe->lqe_pending_req != 0) { + struct l_wait_info lwi = { 0 }; + + lqe_write_unlock(lqe); + if (!sync) { + LQUOTA_DEBUG(lqe, "Abort DQACQ, op=%d", op); + RETURN(0); + } + + LQUOTA_DEBUG(lqe, "waiting for in-flight dqacq/dqrel"); + l_wait_event(lqe->lqe_waiters, + !lqe_pending_dqacq(lqe) || qsd->qsd_stopping, + &lwi); + RETURN(0); + } + + /* fill qb_count & qb_flags */ + if (!qsd_calc_space(lqe, op, qbody)) { + lqe_write_unlock(lqe); + LQUOTA_DEBUG(lqe, "No DQACQ required, op=%u", op); + RETURN(0); + } + lqe->lqe_pending_req++; + lqe_write_unlock(lqe); + + /* fill other quota body fields */ + qbody->qb_fid = qqi->qqi_fid; + qbody->qb_id = lqe->lqe_id; + memset(&qbody->qb_lockh, 0, sizeof(qbody->qb_lockh)); + memset(&qti->qti_lockh, 0, sizeof(qti->qti_lockh)); + + /* hold a refcount until completion */ + lqe_getref(lqe); + + if (req_is_acq(qbody->qb_flags) || req_is_preacq(qbody->qb_flags)) { + /* check whether we already own a lock for this ID */ + lqe_read_lock(lqe); + lustre_handle_copy(&qti->qti_lockh, &lqe->lqe_lockh); + lqe_read_unlock(lqe); + + rc = qsd_id_lock_match(&qti->qti_lockh, &qbody->qb_lockh); + if (rc) { + memset(&qti->qti_lockh, 0, sizeof(qti->qti_lockh)); + if (req_is_preacq(qbody->qb_flags)) { + if (req_has_rep(qbody->qb_flags)) + /* still want to report usage */ + qbody->qb_flags = QUOTA_DQACQ_FL_REPORT; + else + /* no pre-acquire if no per-ID lock */ + GOTO(out, rc = -ENOLCK); + } else { + /* no lock found, should use intent */ + intent = true; + } + } else if (req_is_acq(qbody->qb_flags) && + qbody->qb_count == 0) { + /* found cached lock, no need to acquire */ + GOTO(out, rc = 0); + } + } + + if (!intent) { + rc = qsd_send_dqacq(env, qsd->qsd_exp, qbody, sync, + qsd_dqacq_completion, qqi, &qti->qti_lockh, + lqe); + } else { + union ldlm_wire_lvb *lvb; + + OBD_ALLOC_PTR(lvb); + if (lvb == NULL) + GOTO(out, rc = -ENOMEM); + + rc = qsd_intent_lock(env, qsd->qsd_exp, qbody, sync, + IT_QUOTA_DQACQ, qsd_dqacq_completion, + qqi, lvb, (void *)lqe); + } + /* the completion function will be called by qsd_send_dqacq or + * qsd_intent_lock */ + RETURN(rc); +out: + qsd_dqacq_completion(env, qqi, qbody, NULL, &qti->qti_lockh, NULL, lqe, + rc); + return rc; +} + +/* + * Quota enforcement handler. If local quota can satisfy this operation, + * return success, otherwise, acquire more quota from master. + * (for write operation, if master isn't available at this moment, return + * -EINPROGRESS to inform client to retry the write) + * + * \param env - the environment passed by the caller + * \param qsd - is the qsd instance associated with the device in charge + * of the operation. + * \param qid - is the qid information attached in the transaction handle + * \param space - is the space required by the operation + * \param flags - if the operation is write, return caller no user/group + * and sync commit flags + * + * \retval 0 - success + * \retval -EDQUOT : out of quota + * -EINPROGRESS : inform client to retry write + * -ve : other appropriate errors + */ +static int qsd_op_begin0(const struct lu_env *env, struct qsd_qtype_info *qqi, + struct lquota_id_info *qid, long long space, + int *flags) +{ + struct lquota_entry *lqe; + int rc = 0, retry_cnt; + ENTRY; + + if (qid->lqi_qentry != NULL) { + /* we already had to deal with this id for this transaction */ + lqe = qid->lqi_qentry; + if (!lqe->lqe_enforced) + RETURN(0); + } else { + /* look up lquota entry associated with qid */ + lqe = lqe_locate(env, qqi->qqi_site, &qid->lqi_id); + if (IS_ERR(lqe)) + RETURN(PTR_ERR(lqe)); + if (!lqe->lqe_enforced) { + lqe_putref(lqe); + RETURN(0); + } + qid->lqi_qentry = lqe; + /* lqe will be released in qsd_op_end() */ + } + + if (space <= 0) { + /* when space is negative or null, we don't need to consume + * quota space. That said, we still want to perform space + * adjustments in qsd_op_end, so we return here, but with + * a reference on the lqe */ + if (flags != NULL) { + rc = qsd_refresh_usage(env, lqe); + GOTO(out_flags, rc); + } + RETURN(0); + } + + LQUOTA_DEBUG(lqe, "op_begin space:"LPD64, space); + + lqe_write_lock(lqe); + lqe->lqe_waiting_write += space; + lqe_write_unlock(lqe); + + for (retry_cnt = 0; rc == 0; retry_cnt++) { + /* refresh disk usage if required */ + rc = qsd_refresh_usage(env, lqe); + if (rc) + break; + + /* try to consume local quota space */ + rc = qsd_acquire_local(lqe, space); + if (rc != -EAGAIN) + /* rc == 0, Wouhou! enough local quota space + * rc < 0, something bad happened */ + break; + + /* need to acquire more quota space from master, this is done + * synchronously */ + rc = qsd_dqacq(env, lqe, QSD_ACQ); + LQUOTA_DEBUG(lqe, "Acquired quota space, retry cnt:%d rc:%d", + retry_cnt, rc); + } + + if (rc == 0) { + qid->lqi_space += space; + } else { + LQUOTA_DEBUG(lqe, "Acquire quota failed:%d", rc); + + lqe_write_lock(lqe); + lqe->lqe_waiting_write -= space; + + if (flags && lqe->lqe_pending_write != 0) + /* Inform OSD layer that there are pending writes. + * It might want to retry after a sync if appropriate */ + *flags |= QUOTA_FL_SYNC; + lqe_write_unlock(lqe); + + /* convert recoverable error into -EINPROGRESS, and client will + * retry write on -EINPROGRESS. */ + if (rc == -ETIMEDOUT || rc == -ENOTCONN || rc == -ENOLCK || + rc == -EAGAIN || rc == -EINTR) + rc = -EINPROGRESS; + } + + if (flags != NULL) { +out_flags: + LASSERT(qid->lqi_is_blk); + if (rc != 0) { + *flags |= LQUOTA_OVER_FL(qqi->qqi_qtype); + } else { + __u64 usage; + + lqe_read_lock(lqe); + usage = lqe->lqe_usage; + usage += lqe->lqe_pending_write; + usage += lqe->lqe_waiting_write; + usage += qqi->qqi_qsd->qsd_sync_threshold; + + /* if we should notify client to start sync write */ + if (usage >= lqe->lqe_granted - lqe->lqe_pending_rel) + *flags |= LQUOTA_OVER_FL(qqi->qqi_qtype); + else + *flags &= ~LQUOTA_OVER_FL(qqi->qqi_qtype); + lqe_read_unlock(lqe); + } + } + RETURN(rc); +} + +static inline bool qid_equal(struct lquota_id_info *q1, + struct lquota_id_info *q2) +{ + if (q1->lqi_type != q2->lqi_type) + return false; + return (q1->lqi_id.qid_uid == q2->lqi_id.qid_uid) ? true : false; +} + +/* + * Enforce quota, it's called in the declaration of each operation. + * qsd_op_end() will then be called later once all the operations have been + * completed in order to release/adjust the quota space. + * + * \param env - the environment passed by the caller + * \param qsd - is the qsd instance associated with the device in charge + * of the operation. + * \param trans - is the quota transaction information + * \param qi - qid & space required by current operation + * \param flags - if the operation is write, return caller no user/group + * and sync commit flags + * + * \retval 0 - success + * \retval -EDQUOT : out of quota + * -EINPROGRESS : inform client to retry write + * -ve : other appropriate errors + */ +int qsd_op_begin(const struct lu_env *env, struct qsd_instance *qsd, + struct lquota_trans *trans, struct lquota_id_info *qi, + int *flags) +{ + struct qsd_qtype_info *qqi; + int i, rc; + bool found = false; + ENTRY; + + if (unlikely(qsd == NULL)) + RETURN(0); + + /* We don't enforce quota until the qsd_instance is started */ + cfs_read_lock(&qsd->qsd_lock); + if (!qsd->qsd_started) { + cfs_read_unlock(&qsd->qsd_lock); + RETURN(0); + } + cfs_read_unlock(&qsd->qsd_lock); + + /* ignore block quota on MDTs, ignore inode quota on OSTs */ + if ((!qsd->qsd_is_md && !qi->lqi_is_blk) || + (qsd->qsd_is_md && qi->lqi_is_blk)) + RETURN(0); + + qqi = qsd->qsd_type_array[qi->lqi_type]; + + /* ignore quota enforcement request when: + * - quota isn't enforced for this quota type + * or - we failed to access the accounting object for this quota type + * or - the space to acquire is null + * or - the user/group is root */ + if (!qsd_type_enabled(qsd, qi->lqi_type) || qqi->qqi_acct_obj == NULL || + qi->lqi_id.qid_uid == 0) + RETURN(0); + + LASSERTF(trans->lqt_id_cnt <= QUOTA_MAX_TRANSIDS, "id_cnt=%d", + trans->lqt_id_cnt); + /* check whether we already allocated a slot for this id */ + for (i = 0; i < trans->lqt_id_cnt; i++) { + if (qid_equal(qi, &trans->lqt_ids[i])) { + found = true; + /* make sure we are not mixing inodes & blocks */ + LASSERT(trans->lqt_ids[i].lqi_is_blk == qi->lqi_is_blk); + break; + } + } + + if (!found) { + if (unlikely(i >= QUOTA_MAX_TRANSIDS)) { + CERROR("%s: more than %d qids enforced for a " + "transaction?\n", qsd->qsd_svname, i); + RETURN(-EINVAL); + } + + /* fill new slot */ + trans->lqt_ids[i].lqi_id = qi->lqi_id; + trans->lqt_ids[i].lqi_type = qi->lqi_type; + trans->lqt_ids[i].lqi_is_blk = qi->lqi_is_blk; + trans->lqt_id_cnt++; + } + + /* manage quota enforcement for this ID */ + rc = qsd_op_begin0(env, qqi, &trans->lqt_ids[i], qi->lqi_space, flags); + + RETURN(rc); +} +EXPORT_SYMBOL(qsd_op_begin); + +/** + * Post quota operation, pre-acquire/release quota from master. + * + * \param env - the environment passed by the caller + * \param qsd - is the qsd instance attached to the OSD device which + * is handling the operation. + * \param qqi - is the qsd_qtype_info structure associated with the quota ID + * subject to the operation + * \param qid - stores information related to his ID for the operation + * which has just completed + * + * \retval 0 - success + * \retval -ve - failure + */ +static void qsd_op_end0(const struct lu_env *env, struct qsd_qtype_info *qqi, + struct lquota_id_info *qid) +{ + struct lquota_entry *lqe; + bool adjust; + ENTRY; + + lqe = qid->lqi_qentry; + if (lqe == NULL) + RETURN_EXIT; + qid->lqi_qentry = NULL; + + /* refresh cached usage if a suitable environment is passed */ + if (env != NULL) + qsd_refresh_usage(env, lqe); + + lqe_write_lock(lqe); + if (qid->lqi_space > 0) + lqe->lqe_pending_write -= qid->lqi_space; + if (env != NULL) + adjust = qsd_adjust_needed(lqe); + else + adjust = true; + lqe_write_unlock(lqe); + + if (adjust) { + /* pre-acquire/release quota space is needed */ + if (env != NULL) + qsd_dqacq(env, lqe, QSD_ADJ); + else + /* no suitable environment, handle adjustment in + * separate thread context */ + qsd_adjust_schedule(lqe, false, false); + } + lqe_putref(lqe); + EXIT; +} + +/* + * Post quota operation. It's called after each operation transaction stopped. + * + * \param env - the environment passed by the caller + * \param qsd - is the qsd instance associated with device which is handling + * the operation. + * \param qids - all qids information attached in the transaction handle + * \param count - is the number of qid entries in the qids array. + * + * \retval 0 - success + * \retval -ve - failure + */ +void qsd_op_end(const struct lu_env *env, struct qsd_instance *qsd, + struct lquota_trans *trans) +{ + int i; + ENTRY; + + if (unlikely(qsd == NULL)) + RETURN_EXIT; + + /* We don't enforce quota until the qsd_instance is started */ + cfs_read_lock(&qsd->qsd_lock); + if (!qsd->qsd_started) { + cfs_read_unlock(&qsd->qsd_lock); + RETURN_EXIT; + } + cfs_read_unlock(&qsd->qsd_lock); + + LASSERT(trans != NULL); + + for (i = 0; i < trans->lqt_id_cnt; i++) { + struct qsd_qtype_info *qqi; + + if (trans->lqt_ids[i].lqi_qentry == NULL) + continue; + + qqi = qsd->qsd_type_array[trans->lqt_ids[i].lqi_type]; + qsd_op_end0(env, qqi, &trans->lqt_ids[i]); + } + + /* reset id_count to 0 so that a second accidental call to qsd_op_end() + * does not result in failure */ + trans->lqt_id_cnt = 0; + EXIT; +} +EXPORT_SYMBOL(qsd_op_end); + +void qsd_adjust_quota(const struct lu_env *env, struct qsd_instance *qsd, + union lquota_id *qid, int qtype) +{ + struct lquota_entry *lqe; + struct qsd_qtype_info *qqi; + bool adjust; + ENTRY; + + if (unlikely(qsd == NULL)) + RETURN_EXIT; + + /* We don't enforce quota until the qsd_instance is started */ + cfs_read_lock(&qsd->qsd_lock); + if (!qsd->qsd_started) { + cfs_read_unlock(&qsd->qsd_lock); + RETURN_EXIT; + } + cfs_read_unlock(&qsd->qsd_lock); + + qqi = qsd->qsd_type_array[qtype]; + LASSERT(qqi); + + if (!qsd_type_enabled(qsd, qtype) || qqi->qqi_acct_obj == NULL || + qid->qid_uid == 0) + RETURN_EXIT; + + cfs_read_lock(&qsd->qsd_lock); + if (!qsd->qsd_started) { + cfs_read_unlock(&qsd->qsd_lock); + RETURN_EXIT; + } + cfs_read_unlock(&qsd->qsd_lock); + + lqe = lqe_locate(env, qqi->qqi_site, qid); + if (IS_ERR(lqe)) { + CERROR("%s: fail to locate lqe for id:"LPU64", type:%d\n", + qsd->qsd_svname, qid->qid_uid, qtype); + RETURN_EXIT; + } + + qsd_refresh_usage(env, lqe); + + lqe_read_lock(lqe); + adjust = qsd_adjust_needed(lqe); + lqe_read_unlock(lqe); + + if (adjust) + qsd_dqacq(env, lqe, QSD_ADJ); + + lqe_putref(lqe); + EXIT; +} +EXPORT_SYMBOL(qsd_adjust_quota); diff --git a/lustre/quota/qsd_internal.h b/lustre/quota/qsd_internal.h index f37a5a8..2a84bf6 100644 --- a/lustre/quota/qsd_internal.h +++ b/lustre/quota/qsd_internal.h @@ -55,6 +55,12 @@ struct qsd_instance { * are exported */ cfs_proc_dir_entry_t *qsd_proc; + /* export used for the connection to quota master */ + struct obd_export *qsd_exp; + + /* ldlm namespace used for quota locks */ + struct ldlm_namespace *qsd_ns; + /* on-disk directory where to store index files for this qsd instance */ struct dt_object *qsd_root; @@ -66,23 +72,41 @@ struct qsd_instance { * future. For the time being, we can just use an array. */ struct qsd_qtype_info *qsd_type_array[MAXQUOTAS]; + /* per-filesystem quota information */ + struct qsd_fsinfo *qsd_fsinfo; + + /* link into qfs_qsd_list of qfs_fsinfo */ + cfs_list_t qsd_link; + + /* list of lqe entry which might need quota space adjustment */ + cfs_list_t qsd_adjust_list; + + /* lock protecting adjust list */ + cfs_spinlock_t qsd_adjust_lock; + + /* dedicated thread for updating slave index files. */ + struct ptlrpc_thread qsd_upd_thread; + + /* list of update tasks */ + cfs_list_t qsd_upd_list; + /* r/w spinlock protecting: * - the state flags * - the qsd update list * - the deferred list - * - flags of the qsd_qtype_info - * - * probably way too much :( - */ + * - flags of the qsd_qtype_info */ cfs_rwlock_t qsd_lock; - /* per-filesystem quota information */ - struct qsd_fsinfo *qsd_fsinfo; - - /* link into qfs_qsd_list of qfs_fsinfo */ - cfs_list_t qsd_link; + /* Default quota settings which apply to all identifiers */ + /* when blk qunit reaches this value, later write reqs from client + * should be sync. b=16642 */ + unsigned long qsd_sync_threshold; unsigned long qsd_is_md:1, /* managing quota for mdt */ + qsd_started:1, /* instance is now started */ + qsd_prepared:1, /* qsd_prepare() successfully + * called */ + qsd_exp_valid:1,/* qsd_exp is now valid */ qsd_stopping:1; /* qsd_instance is stopping */ }; @@ -102,6 +126,9 @@ struct qsd_qtype_info { /* Global index FID to use for this quota type */ struct lu_fid qqi_fid; + /* Slave index FID allocated by the master */ + struct lu_fid qqi_slv_fid; + /* back pointer to qsd device * immutable after creation. */ struct qsd_instance *qqi_qsd; @@ -118,6 +145,21 @@ struct qsd_qtype_info { __u64 qqi_slv_ver; /* slave index version */ __u64 qqi_glb_ver; /* global index version */ + /* per quota ID information. All lquota entry are kept in a hash table + * and read from disk on cache miss. */ + struct lquota_site *qqi_site; + + /* Reintegration thread */ + struct ptlrpc_thread qqi_reint_thread; + + /* statistics on operations performed by this slave */ + struct lprocfs_stats *qqi_stats; + + /* deferred update for the global index copy */ + cfs_list_t qqi_deferred_glb; + /* deferred update for the slave index copy */ + cfs_list_t qqi_deferred_slv; + /* Various flags representing the current state of the slave for this * quota type. */ unsigned long qqi_glb_uptodate:1, /* global index uptodate @@ -185,6 +227,17 @@ enum qsd_ops { #define QSD_RES_TYPE(qsd) ((qsd)->qsd_is_md ? LQUOTA_RES_MD : LQUOTA_RES_DT) +/* udpate record for slave & global index copy */ +struct qsd_upd_rec { + cfs_list_t qur_link; /* link into qsd_upd_list */ + union lquota_id qur_qid; + union lquota_rec qur_rec; + struct qsd_qtype_info *qur_qqi; + struct lquota_entry *qur_lqe; + __u64 qur_ver; + bool qur_global; +}; + /* Common data shared by qsd-level handlers. This is allocated per-thread to * reduce stack consumption. */ struct qsd_thread_info { @@ -265,6 +318,8 @@ static inline void qsd_set_qunit(struct lquota_entry *lqe, __u64 qunit) lqe->lqe_nopreacq = false; } +#define QSD_WB_INTERVAL 15 /* 15 seconds */ + /* qsd_entry.c */ extern struct lquota_entry_operations qsd_lqe_ops; int qsd_refresh_usage(const struct lu_env *, struct lquota_entry *); @@ -311,11 +366,14 @@ void qsd_put_fsinfo(struct qsd_fsinfo *); int qsd_process_config(struct lustre_cfg *); /* qsd_handler.c */ -/* XXX to be replaced with real function once qsd_handler landed */ -static inline int qsd_dqacq(const struct lu_env *env, struct lquota_entry *lqe, - enum qsd_ops op) -{ - return 0; -} +int qsd_dqacq(const struct lu_env *, struct lquota_entry *, enum qsd_ops); +__u64 qsd_calc_grants(struct lquota_entry *, __u64, __u32); +/* qsd_writeback.c */ +void qsd_upd_schedule(struct qsd_qtype_info *, struct lquota_entry *, + union lquota_id *, union lquota_rec *, __u64, bool); +void qsd_bump_version(struct qsd_qtype_info *, __u64, bool); +int qsd_start_upd_thread(struct qsd_instance *); +void qsd_stop_upd_thread(struct qsd_instance *); +void qsd_adjust_schedule(struct lquota_entry *, bool, bool); #endif /* _QSD_INTERNAL_H */ diff --git a/lustre/quota/qsd_lib.c b/lustre/quota/qsd_lib.c index 65208d2..c1ba0c3 100644 --- a/lustre/quota/qsd_lib.c +++ b/lustre/quota/qsd_lib.c @@ -58,6 +58,19 @@ #include #include "qsd_internal.h" +cfs_mem_cache_t *upd_kmem; + +struct lu_kmem_descr qsd_caches[] = { + { + .ckd_cache = &upd_kmem, + .ckd_name = "upd_kmem", + .ckd_size = sizeof(struct qsd_upd_rec) + }, + { + .ckd_cache = NULL + } +}; + /* define qsd thread key */ LU_KEY_INIT_FINI(qsd, struct qsd_thread_info); LU_CONTEXT_KEY_DEFINE(qsd, LCT_MD_THREAD | LCT_DT_THREAD | LCT_LOCAL); @@ -68,16 +81,40 @@ static int lprocfs_qsd_rd_state(char *page, char **start, off_t off, int count, int *eof, void *data) { struct qsd_instance *qsd = (struct qsd_instance *)data; + char enabled[5]; + int rc; LASSERT(qsd != NULL); - return snprintf(page, count, - "target name: %s\n" - "pool ID: %d\n" - "type: %s\n" - "quota enabled: none\n", - qsd->qsd_svname, qsd->qsd_pool_id, - qsd->qsd_is_md ? "md" : "dt"); + memset(enabled, 0, sizeof(enabled)); + if (qsd_type_enabled(qsd, USRQUOTA)) + strcat(enabled, "u"); + if (qsd_type_enabled(qsd, GRPQUOTA)) + strcat(enabled, "g"); + if (strlen(enabled) == 0) + strcat(enabled, "none"); + + rc = snprintf(page, count, + "target name: %s\n" + "pool ID: %d\n" + "type: %s\n" + "quota enabled: %s\n" + "conn to master: %s\n", + qsd->qsd_svname, qsd->qsd_pool_id, + qsd->qsd_is_md ? "md" : "dt", enabled, + qsd->qsd_exp_valid ? "setup" : "not setup yet"); + + if (qsd->qsd_prepared) + rc += snprintf(page + rc, count - rc, + "user uptodate: glb[%d],slv[%d],reint[%d]\n" + "group uptodate: glb[%d],slv[%d],reint[%d]\n", + qsd->qsd_type_array[USRQUOTA]->qqi_glb_uptodate, + qsd->qsd_type_array[USRQUOTA]->qqi_slv_uptodate, + qsd->qsd_type_array[USRQUOTA]->qqi_reint, + qsd->qsd_type_array[GRPQUOTA]->qqi_glb_uptodate, + qsd->qsd_type_array[GRPQUOTA]->qqi_slv_uptodate, + qsd->qsd_type_array[GRPQUOTA]->qqi_reint); + return rc; } static int lprocfs_qsd_rd_enabled(char *page, char **start, off_t off, @@ -105,6 +142,47 @@ static struct lprocfs_vars lprocfs_quota_qsd_vars[] = { }; /* + * Callback function invoked by the OSP layer when the connection to the master + * has been set up. + * + * \param data - is a pointer to the qsd_instance + * + * \retval - 0 on success, appropriate error on failure + */ +static int qsd_conn_callback(void *data) +{ + struct qsd_instance *qsd = (struct qsd_instance *)data; + int type; + ENTRY; + + /* qsd_exp should now be valid */ + LASSERT(qsd->qsd_exp); + + /* grab reference on namespace */ + ldlm_namespace_get(class_exp2obd(qsd->qsd_exp)->obd_namespace); + qsd->qsd_ns = class_exp2obd(qsd->qsd_exp)->obd_namespace; + + cfs_write_lock(&qsd->qsd_lock); + /* notify that qsd_exp is now valid */ + qsd->qsd_exp_valid = true; + cfs_write_unlock(&qsd->qsd_lock); + + /* Now that the connection to master is setup, we can initiate the + * reintegration procedure for quota types which are enabled. + * It is worth noting that, if the qsd_instance hasn't been started + * already, then we can only complete the first two steps of the + * reintegration procedure (i.e. global lock enqueue and slave + * index transfer) since the space usage reconciliation (i.e. + * step 3) will have to wait for qsd_start() to be called */ + for (type = USRQUOTA; type < MAXQUOTAS; type++) { + struct qsd_qtype_info *qqi = qsd->qsd_type_array[type]; + cfs_waitq_signal(&qqi->qqi_reint_thread.t_ctl_waitq); + } + + RETURN(0); +} + +/* * Release qsd_qtype_info structure which contains data associated with a * given quota type. This releases the accounting objects. * It's called on OSD cleanup when the qsd instance is released. @@ -125,6 +203,16 @@ static void qsd_qtype_fini(const struct lu_env *env, struct qsd_instance *qsd, qqi = qsd->qsd_type_array[qtype]; qsd->qsd_type_array[qtype] = NULL; + /* all deferred work lists should be empty */ + LASSERT(cfs_list_empty(&qqi->qqi_deferred_glb)); + LASSERT(cfs_list_empty(&qqi->qqi_deferred_slv)); + + /* shutdown lquota site */ + if (qqi->qqi_site != NULL && !IS_ERR(qqi->qqi_site)) { + lquota_site_free(env, qqi->qqi_site); + qqi->qqi_site = NULL; + } + /* by now, all qqi users should have gone away */ LASSERT(cfs_atomic_read(&qqi->qqi_ref) == 1); lu_ref_fini(&qqi->qqi_reference); @@ -193,17 +281,22 @@ static int qsd_qtype_init(const struct lu_env *env, struct qsd_instance *qsd, qqi->qqi_glb_uptodate = false; qqi->qqi_slv_uptodate = false; qqi->qqi_reint = false; + cfs_waitq_init(&qqi->qqi_reint_thread.t_ctl_waitq); + thread_set_flags(&qqi->qqi_reint_thread, SVC_STOPPED); + CFS_INIT_LIST_HEAD(&qqi->qqi_deferred_glb); + CFS_INIT_LIST_HEAD(&qqi->qqi_deferred_slv); memset(&qqi->qqi_lockh, 0, sizeof(qqi->qqi_lockh)); - /* open accounting object */ - LASSERT(qqi->qqi_acct_obj == NULL); - qqi->qqi_acct_obj = acct_obj_lookup(env, qsd->qsd_dev, - qtype == USRQUOTA ? ACCT_USER_OID - : ACCT_GROUP_OID); - /* don't print any error message on failure in order not to confuse - * non-OFD user (e.g. 2.3 MDT stack) */ - if (IS_ERR(qqi->qqi_acct_obj)) - qqi->qqi_acct_obj = NULL; + /* open accounting object */ + LASSERT(qqi->qqi_acct_obj == NULL); + qqi->qqi_acct_obj = acct_obj_lookup(env, qsd->qsd_dev, qtype); + if (qqi->qqi_acct_obj == NULL) { + LCONSOLE_ERROR("%s: No %s space accounting support. Please use " + "tunefs.lustre --quota option to enable quota " + "accounting.\n", + qsd->qsd_svname, QTYPE_NAME(qtype)); + GOTO(out, rc = -ENOENT); + } /* open global index copy */ LASSERT(qqi->qqi_glb_obj == NULL); @@ -233,17 +326,24 @@ static int qsd_qtype_init(const struct lu_env *env, struct qsd_instance *qsd, } qqi->qqi_slv_ver = dt_version_get(env, qqi->qqi_slv_obj); + /* allocate site */ + qqi->qqi_site = lquota_site_alloc(env, qqi, false, qtype, &qsd_lqe_ops); + if (IS_ERR(qqi->qqi_site)) { + CERROR("%s: can't allocate site "DFID" %ld\n", qsd->qsd_svname, + PFID(&qqi->qqi_fid), PTR_ERR(qqi->qqi_site)); + GOTO(out, rc = PTR_ERR(qqi->qqi_site)); + } + /* register proc entry for accounting object */ rc = lprocfs_seq_create(qsd->qsd_proc, qtype == USRQUOTA ? "acct_user" : "acct_group", 0444, &lprocfs_quota_seq_fops, qqi->qqi_acct_obj); if (rc) { - CWARN("%s: can't add procfs entry for accounting file %d\n", - qsd->qsd_svname, rc); + CERROR("%s: can't add procfs entry for accounting file %d\n", + qsd->qsd_svname, rc); GOTO(out, rc); } - EXIT; out: if (rc) @@ -253,7 +353,8 @@ out: /* * Release a qsd_instance. Companion of qsd_init(). This releases all data - * structures associated with the quota slave. + * structures associated with the quota slave (on-disk objects, lquota entry + * tables, ...). * This function should be called when the OSD is shutting down. * * \param env - is the environment passed by the caller @@ -265,7 +366,9 @@ void qsd_fini(const struct lu_env *env, struct qsd_instance *qsd) ENTRY; CDEBUG(D_QUOTA, "%s: initiating QSD shutdown\n", qsd->qsd_svname); + cfs_write_lock(&qsd->qsd_lock); qsd->qsd_stopping = true; + cfs_write_unlock(&qsd->qsd_lock); /* remove from the list of fsinfo */ if (!cfs_list_empty(&qsd->qsd_link)) { @@ -281,10 +384,30 @@ void qsd_fini(const struct lu_env *env, struct qsd_instance *qsd) qsd->qsd_proc = NULL; } + /* stop the writeback thread */ + qsd_stop_upd_thread(qsd); + + /* shutdown the reintegration threads */ + for (qtype = USRQUOTA; qtype < MAXQUOTAS; qtype++) { + if (qsd->qsd_type_array[qtype] == NULL) + continue; + qsd_stop_reint_thread(qsd->qsd_type_array[qtype]); + } + + /* release reference on namespace */ + if (qsd->qsd_ns != NULL) { + ldlm_namespace_put(qsd->qsd_ns); + qsd->qsd_ns = NULL; + } + /* free per-quota type data */ for (qtype = USRQUOTA; qtype < MAXQUOTAS; qtype++) qsd_qtype_fini(env, qsd, qtype); + /* deregister connection to the quota master */ + qsd->qsd_exp_valid = false; + lustre_deregister_osp_item(&qsd->qsd_exp); + /* release per-filesystem information */ if (qsd->qsd_fsinfo != NULL) qsd_put_fsinfo(qsd->qsd_fsinfo); @@ -302,6 +425,7 @@ void qsd_fini(const struct lu_env *env, struct qsd_instance *qsd) qsd->qsd_dev = NULL; } + CDEBUG(D_QUOTA, "%s: QSD shutdown completed\n", qsd->qsd_svname); OBD_FREE_PTR(qsd); EXIT; } @@ -334,8 +458,17 @@ struct qsd_instance *qsd_init(const struct lu_env *env, char *svname, if (qsd == NULL) RETURN(ERR_PTR(-ENOMEM)); + /* generic initializations */ cfs_rwlock_init(&qsd->qsd_lock); CFS_INIT_LIST_HEAD(&qsd->qsd_link); + thread_set_flags(&qsd->qsd_upd_thread, SVC_STOPPED); + cfs_waitq_init(&qsd->qsd_upd_thread.t_ctl_waitq); + CFS_INIT_LIST_HEAD(&qsd->qsd_upd_list); + cfs_spin_lock_init(&qsd->qsd_adjust_lock); + CFS_INIT_LIST_HEAD(&qsd->qsd_adjust_list); + qsd->qsd_prepared = false; + qsd->qsd_started = false; + /* copy service name */ strncpy(qsd->qsd_svname, svname, MAX_OBD_NAME); @@ -378,6 +511,7 @@ struct qsd_instance *qsd_init(const struct lu_env *env, char *svname, svname, rc); GOTO(out, rc); } + EXIT; out: if (rc) { qsd_fini(env, qsd); @@ -404,15 +538,29 @@ EXPORT_SYMBOL(qsd_init); */ int qsd_prepare(const struct lu_env *env, struct qsd_instance *qsd) { - int rc, qtype; + struct qsd_thread_info *qti = qsd_info(env); + int qtype, rc = 0; ENTRY; LASSERT(qsd != NULL); + cfs_read_lock(&qsd->qsd_lock); + if (qsd->qsd_prepared) { + CERROR("%s: qsd instance already prepared\n", qsd->qsd_svname); + rc = -EALREADY; + } + cfs_read_unlock(&qsd->qsd_lock); + if (rc) + RETURN(rc); + /* Record whether this qsd instance is managing quota enforcement for a * MDT (i.e. inode quota) or OST (block quota) */ - if (lu_device_is_md(qsd->qsd_dev->dd_lu_dev.ld_site->ls_top_dev)) + if (lu_device_is_md(qsd->qsd_dev->dd_lu_dev.ld_site->ls_top_dev)) { qsd->qsd_is_md = true; + qsd->qsd_sync_threshold = LQUOTA_LEAST_QUNIT(LQUOTA_RES_MD); + } else { + qsd->qsd_sync_threshold = LQUOTA_LEAST_QUNIT(LQUOTA_RES_DT); + } /* look-up on-disk directory for the quota slave */ qsd->qsd_root = lquota_disk_dir_find_create(env, qsd->qsd_dev, NULL, @@ -432,10 +580,96 @@ int qsd_prepare(const struct lu_env *env, struct qsd_instance *qsd) RETURN(rc); } + /* pools successfully setup, mark the qsd as prepared */ + cfs_write_lock(&qsd->qsd_lock); + qsd->qsd_prepared = true; + cfs_write_unlock(&qsd->qsd_lock); + + /* start reintegration thread for each type, if required */ + for (qtype = USRQUOTA; qtype < MAXQUOTAS; qtype++) { + struct qsd_qtype_info *qqi = qsd->qsd_type_array[qtype]; + + rc = qsd_start_reint_thread(qqi); + if (rc) { + CERROR("%s: failed to start reint thread for type %s " + "(%d)\n", qsd->qsd_svname, QTYPE_NAME(qtype), + rc); + RETURN(rc); + } + } + + /* start writeback thread */ + rc = qsd_start_upd_thread(qsd); + if (rc) { + CERROR("%s: failed to start writeback thread (%d)\n", + qsd->qsd_svname, rc); + RETURN(rc); + } + + /* generate osp name */ + rc = tgt_name2ospname((char *)qsd->qsd_svname, qti->qti_buf); + if (rc) { + CERROR("%s: failed to generate ospname (%d)\n", + qsd->qsd_svname, rc); + RETURN(rc); + } + + /* the connection callback will start the reintegration + * procedure if quota is enabled */ + rc = lustre_register_osp_item(qti->qti_buf, &qsd->qsd_exp, + qsd_conn_callback, (void *)qsd); + if (rc) { + CERROR("%s: fail to get connection to master (%d)\n", + qsd->qsd_svname, rc); + RETURN(rc); + } + RETURN(0); } EXPORT_SYMBOL(qsd_prepare); +/* + * Start a qsd instance. This will complete the last step of the reintegration + * procedure as soon as possible (provided that the master is reachable). + * This should be called when recovery has been completed and quota should now + * be enforced on every operations. + * + * \param env - the environment passed by the caller + * \param qsd - is the qsd instance associated with the osd device to start + */ +int qsd_start(const struct lu_env *env, struct qsd_instance *qsd) +{ + int type, rc = 0; + ENTRY; + + cfs_write_lock(&qsd->qsd_lock); + if (!qsd->qsd_prepared) { + CERROR("%s: can't start qsd instance since it was properly " + "initialized\n", qsd->qsd_svname); + rc = -EFAULT; + } else if (qsd->qsd_started) { + CERROR("%s: qsd instance already started\n", qsd->qsd_svname); + rc = -EALREADY; + } else { + /* notify that the qsd_instance is now started */ + qsd->qsd_started = true; + } + cfs_write_unlock(&qsd->qsd_lock); + + if (rc) + RETURN(rc); + + /* Trigger the 3rd step of reintegration: If usage > granted, acquire + * up to usage; If usage < granted, release down to usage. */ + for (type = USRQUOTA; type < MAXQUOTAS; type++) { + struct qsd_qtype_info *qqi = qsd->qsd_type_array[type]; + cfs_waitq_signal(&qqi->qqi_reint_thread.t_ctl_waitq); + } + + RETURN(rc); +} +EXPORT_SYMBOL(qsd_start); + void lustre_register_quota_process_config(int (*qpc)(struct lustre_cfg *lcfg)); /* @@ -443,9 +677,16 @@ void lustre_register_quota_process_config(int (*qpc)(struct lustre_cfg *lcfg)); */ int qsd_glb_init(void) { + int rc; + + rc = lu_kmem_init(qsd_caches); + if (rc) + return rc; + qsd_key_init_generic(&qsd_thread_key, NULL); lu_context_key_register(&qsd_thread_key); lustre_register_quota_process_config(qsd_process_config); + return 0; } @@ -455,5 +696,6 @@ int qsd_glb_init(void) void qsd_glb_fini(void) { lustre_register_quota_process_config(NULL); + lu_kmem_fini(qsd_caches); lu_context_key_degister(&qsd_thread_key); } diff --git a/lustre/quota/qsd_lock.c b/lustre/quota/qsd_lock.c index 7c17a62..ae916c7 100644 --- a/lustre/quota/qsd_lock.c +++ b/lustre/quota/qsd_lock.c @@ -227,10 +227,10 @@ static int qsd_glb_glimpse_ast(struct ldlm_lock *lock, void *data) /* valid race */ GOTO(out, rc = -ELDLM_NO_LOCK_DATA); - LCONSOLE_INFO("%s: glimpse on glb quota locks, id:"LPU64" ver:"LPU64 - " hard:" LPU64" soft:"LPU64"\n", qqi->qqi_qsd->qsd_svname, - desc->gl_id.qid_uid, desc->gl_ver, desc->gl_hardlimit, - desc->gl_softlimit); + CDEBUG(D_QUOTA, "%s: glimpse on glb quota locks, id:"LPU64" ver:"LPU64 + " hard:" LPU64" soft:"LPU64"\n", qqi->qqi_qsd->qsd_svname, + desc->gl_id.qid_uid, desc->gl_ver, desc->gl_hardlimit, + desc->gl_softlimit); if (desc->gl_ver == 0) { CERROR("%s: invalid global index version "LPU64"\n", @@ -380,8 +380,8 @@ static int qsd_id_glimpse_ast(struct ldlm_lock *lock, void *data) /* valid race */ GOTO(out, rc = -ELDLM_NO_LOCK_DATA); - LQUOTA_CONSOLE(lqe, "glimpse on quota locks, new qunit:"LPU64, - desc->gl_qunit); + LQUOTA_DEBUG(lqe, "glimpse on quota locks, new qunit:"LPU64, + desc->gl_qunit); qsd = lqe2qqi(lqe)->qqi_qsd; @@ -400,14 +400,14 @@ static int qsd_id_glimpse_ast(struct ldlm_lock *lock, void *data) if (space > 0) { if (lqe->lqe_pending_req > 0) { - LQUOTA_ERROR(lqe, "request in flight, postpone " + LQUOTA_DEBUG(lqe, "request in flight, postpone " "release of "LPD64, space); lvb->lvb_id_may_rel = space; } else { lqe->lqe_pending_req++; /* release quota space in glimpse reply */ - LQUOTA_ERROR(lqe, "releasing "LPD64, space); + LQUOTA_DEBUG(lqe, "releasing "LPD64, space); lqe->lqe_granted -= space; lvb->lvb_id_rel = space; diff --git a/lustre/quota/qsd_reint.c b/lustre/quota/qsd_reint.c index fdbfdc8..040b885 100644 --- a/lustre/quota/qsd_reint.c +++ b/lustre/quota/qsd_reint.c @@ -36,11 +36,543 @@ #include "qsd_internal.h" +/* + * Completion function invoked when the global quota lock enqueue has completed + */ +static void qsd_reint_completion(const struct lu_env *env, + struct qsd_qtype_info *qqi, + struct quota_body *req_qbody, + struct quota_body *rep_qbody, + struct lustre_handle *lockh, + union ldlm_wire_lvb *lvb, + void *arg, int rc) +{ + struct qsd_instance *qsd = qqi->qqi_qsd; + __u64 *slv_ver = (__u64 *)arg; + ENTRY; + + if (rc) { + CERROR("%s: failed to enqueue global quota lock, glb " + "fid:"DFID", rc:%d\n", qsd->qsd_svname, + PFID(&req_qbody->qb_fid), rc); + RETURN_EXIT; + } + + CDEBUG(D_QUOTA, "%s: global quota lock successfully acquired, glb " + "fid:"DFID", glb ver:"LPU64", slv fid:"DFID", slv ver:"LPU64"\n", + qsd->qsd_svname, PFID(&req_qbody->qb_fid), + lvb->l_lquota.lvb_glb_ver, PFID(&rep_qbody->qb_slv_fid), + rep_qbody->qb_slv_ver); + + *slv_ver = rep_qbody->qb_slv_ver; + memcpy(&qqi->qqi_slv_fid, &rep_qbody->qb_slv_fid, + sizeof(struct lu_fid)); + lustre_handle_copy(&qqi->qqi_lockh, lockh); + EXIT; +} + +static int qsd_reint_qid(const struct lu_env *env, struct qsd_qtype_info *qqi, + bool global, union lquota_id *qid, void *rec) +{ + struct lquota_entry *lqe; + int rc; + ENTRY; + + lqe = lqe_locate(env, qqi->qqi_site, qid); + if (IS_ERR(lqe)) + RETURN(PTR_ERR(lqe)); + + rc = qsd_update_lqe(env, lqe, global, rec); + if (rc) + GOTO(out, rc); + + rc = qsd_update_index(env, qqi, qid, global, 0, rec); +out: + lqe_putref(lqe); + RETURN(rc); +} + +static int qsd_reint_entries(const struct lu_env *env, + struct qsd_qtype_info *qqi, + struct idx_info *ii, bool global, + cfs_page_t **pages, + unsigned int npages, bool need_swab) +{ + struct qsd_thread_info *qti = qsd_info(env); + union lquota_id *qid = &qti->qti_id; + int i, j, k, size; + int rc = 0; + ENTRY; + + /* sanity check on the record size */ + if ((global && ii->ii_recsize != sizeof(struct lquota_glb_rec)) || + (!global && ii->ii_recsize != sizeof(struct lquota_slv_rec))) { + CERROR("Invalid record size:%d, global:%s\n", + ii->ii_recsize, global ? "true" : "false"); + RETURN(-EINVAL); + } + + size = ii->ii_recsize + ii->ii_keysize + sizeof(__u64); + + for (i = 0; i < npages; i++) { + union lu_page *lip = cfs_kmap(pages[i]); + + for (j = 0; j < LU_PAGE_COUNT; j++) { + for (k = 0; k < lip->lp_idx.lip_nr; k++) { + char *entry; + + entry = lip->lp_idx.lip_entries + k * size; + memcpy(qid, entry, ii->ii_keysize); /* key */ + entry += ii->ii_keysize; /* value */ + + if (need_swab) { + int offset = 0; + + /* swab key */ + __swab64s(&qid->qid_uid); + /* quota records only include 64-bit + * fields */ + while (offset < ii->ii_recsize) { + __swab64s((__u64 *) + (entry + offset)); + offset += sizeof(__u64); + } + } + + rc = qsd_reint_qid(env, qqi, global, qid, + (void *)entry); + if (rc) + GOTO(out, rc); + } + lip++; + } +out: + cfs_kunmap(pages[i]); + if (rc) + break; + } + RETURN(rc); +} + +static int qsd_reint_index(const struct lu_env *env, struct qsd_qtype_info *qqi, + bool global) +{ + struct qsd_thread_info *qti = qsd_info(env); + struct qsd_instance *qsd = qqi->qqi_qsd; + struct idx_info *ii = &qti->qti_ii; + struct lu_fid *fid; + cfs_page_t **pages = NULL; + unsigned int npages, pg_cnt; + __u64 start_hash = 0, ver = 0; + bool need_swab = false; + int i, rc; + ENTRY; + + fid = global ? &qqi->qqi_fid : &qqi->qqi_slv_fid; + + /* let's do a 1MB bulk */ + npages = min_t(unsigned int, PTLRPC_MAX_BRW_SIZE, 1 << 20); + npages /= CFS_PAGE_SIZE; + + /* allocate pages for bulk index read */ + OBD_ALLOC(pages, npages * sizeof(*pages)); + if (pages == NULL) + GOTO(out, rc = -ENOMEM); + for (i = 0; i < npages; i++) { + pages[i] = cfs_alloc_page(CFS_ALLOC_STD); + if (pages[i] == NULL) + GOTO(out, rc = -ENOMEM); + } + +repeat: + /* initialize index_info request with FID of global index */ + memset(ii, 0, sizeof(*ii)); + memcpy(&ii->ii_fid, fid, sizeof(*fid)); + ii->ii_magic = IDX_INFO_MAGIC; + ii->ii_flags = II_FL_NOHASH; + ii->ii_count = npages * LU_PAGE_COUNT; + ii->ii_hash_start = start_hash; + + /* send bulk request to quota master to read global index */ + rc = qsd_fetch_index(env, qsd->qsd_exp, ii, npages, pages, &need_swab); + if (rc) { + CWARN("%s: failed to fetch index for "DFID". %d\n", + qsd->qsd_svname, PFID(fid), rc); + GOTO(out, rc); + } + + /* various sanity checks */ + if (ii->ii_magic != IDX_INFO_MAGIC) { + CERROR("%s: invalid magic in index transfer %x != %x\n", + qsd->qsd_svname, ii->ii_magic, IDX_INFO_MAGIC); + GOTO(out, rc = -EPROTO); + } + if ((ii->ii_flags & II_FL_VARKEY) != 0) + CWARN("%s: II_FL_VARKEY is set on index transfer for fid "DFID + ", it shouldn't be\n", qsd->qsd_svname, PFID(fid)); + if ((ii->ii_flags & II_FL_NONUNQ) != 0) + CWARN("%s: II_FL_NONUNQ is set on index transfer for fid "DFID + ", it shouldn't be\n", qsd->qsd_svname, PFID(fid)); + if (ii->ii_keysize != sizeof(__u64)) { + CERROR("%s: invalid key size reported on index transfer for " + "fid "DFID", %u != %u\n", qsd->qsd_svname, PFID(fid), + ii->ii_keysize, (int)sizeof(__u64)); + GOTO(out, rc = -EPROTO); + } + if (ii->ii_version == 0 && ii->ii_count != 0) + CWARN("%s: index version for fid "DFID" is 0, but index isn't " + "empty (%d)\n", qsd->qsd_svname, PFID(fid), ii->ii_count); + + CDEBUG(D_QUOTA, "%s: reintegration process for fid "DFID" successfully " + "fetched %s index, count = %d\n", qsd->qsd_svname, + PFID(fid), global ? "global" : "slave", ii->ii_count); + + if (start_hash == 0) + /* record version associated with the first bulk transfer */ + ver = ii->ii_version; + + pg_cnt = (ii->ii_count + (LU_PAGE_COUNT) - 1); + pg_cnt >>= CFS_PAGE_SHIFT - LU_PAGE_SHIFT; + + if (pg_cnt > npages) { + CERROR("%s: master returned more pages than expected, %u > %u" + "\n", qsd->qsd_svname, pg_cnt, npages); + pg_cnt = npages; + } + + rc = qsd_reint_entries(env, qqi, ii, global, pages, pg_cnt, need_swab); + if (rc) + GOTO(out, rc); + + if (ii->ii_hash_end != II_END_OFF) { + start_hash = ii->ii_hash_end; + goto repeat; + } +out: + if (pages != NULL) { + for (i = 0; i < npages; i++) + if (pages[i] != NULL) + cfs_free_page(pages[i]); + OBD_FREE(pages, npages * sizeof(*pages)); + } + + /* Update index version */ + if (rc == 0) { + rc = qsd_write_version(env, qqi, ver, global); + if (rc) + CERROR("%s: write version "LPU64" to "DFID" failed. " + "%d\n", qsd->qsd_svname, ver, PFID(fid), rc); + } + + RETURN(rc); +} + +static int qsd_reconciliation(const struct lu_env *env, + struct qsd_qtype_info *qqi) +{ + struct qsd_thread_info *qti = qsd_info(env); + struct qsd_instance *qsd = qqi->qqi_qsd; + const struct dt_it_ops *iops; + struct dt_it *it; + struct dt_key *key; + struct lquota_entry *lqe; + union lquota_id *qid = &qti->qti_id; + int rc; + ENTRY; + + LASSERT(qqi->qqi_glb_obj != NULL); + iops = &qqi->qqi_glb_obj->do_index_ops->dio_it; + + it = iops->init(env, qqi->qqi_glb_obj, 0, BYPASS_CAPA); + if (IS_ERR(it)) { + CWARN("%s: Initialize it for "DFID" failed. %ld\n", + qsd->qsd_svname, PFID(&qqi->qqi_fid), PTR_ERR(it)); + RETURN(PTR_ERR(it)); + } + + rc = iops->load(env, it, 0); + if (rc < 0) { + CWARN("%s: Load first entry for "DFID" failed. %d\n", + qsd->qsd_svname, PFID(&qqi->qqi_fid), rc); + GOTO(out, rc); + } else if (rc == 0) { + rc = iops->next(env, it); + if (rc != 0) + GOTO(out, rc = (rc < 0) ? rc : 0); + } + + do { + key = iops->key(env, it); + if (IS_ERR(key)) { + CWARN("%s: Error key for "DFID". %ld\n", + qsd->qsd_svname, PFID(&qqi->qqi_fid), + PTR_ERR(key)); + GOTO(out, rc = PTR_ERR(key)); + } + + /* skip the root user/group */ + if (*((__u64 *)key) == 0) + goto next; + + qid->qid_uid = *((__u64 *)key); + + lqe = lqe_locate(env, qqi->qqi_site, qid); + if (IS_ERR(lqe)) { + CWARN("%s: Fail to locate lqe. "DFID", %ld\n", + qsd->qsd_svname, PFID(&qqi->qqi_fid), + PTR_ERR(lqe)); + GOTO(out, rc = PTR_ERR(lqe)); + } + + if (!lqe->lqe_enforced) { + lqe_putref(lqe); + goto next; + } + + rc = qsd_refresh_usage(env, lqe); + if (rc) { + CWARN("%s: Fail to get usage. "DFID", %d\n", + qsd->qsd_svname, PFID(&qqi->qqi_fid), rc); + lqe_putref(lqe); + GOTO(out, rc); + } + + rc = qsd_dqacq(env, lqe, QSD_REP); + lqe_putref(lqe); + + if (rc) { + CWARN("%s: Fail to report quota. "DFID", %d\n", + qsd->qsd_svname, PFID(&qqi->qqi_fid), rc); + GOTO(out, rc); + } +next: + rc = iops->next(env, it); + if (rc < 0) + CWARN("%s: Error next "DFID". %d\n", qsd->qsd_svname, + PFID(&qqi->qqi_fid), rc); + } while (rc == 0); + + /* reach the end */ + if (rc > 0) + rc = 0; +out: + iops->put(env, it); + iops->fini(env, it); + RETURN(rc); +} + +static int qsd_connected(struct qsd_instance *qsd) +{ + int connected; + + cfs_read_lock(&qsd->qsd_lock); + connected = qsd->qsd_exp_valid ? 1 : 0; + cfs_read_unlock(&qsd->qsd_lock); + + return connected; +} + +static int qsd_started(struct qsd_instance *qsd) +{ + int started; + + cfs_read_lock(&qsd->qsd_lock); + started = qsd->qsd_started ? 1 : 0; + cfs_read_unlock(&qsd->qsd_lock); + + return started; +} + +/* + * Routine executed by the reintegration thread. + */ +static int qsd_reint_main(void *args) +{ + struct lu_env *env; + struct qsd_thread_info *qti; + struct qsd_qtype_info *qqi = (struct qsd_qtype_info *)args; + struct qsd_instance *qsd = qqi->qqi_qsd; + struct ptlrpc_thread *thread = &qqi->qqi_reint_thread; + struct l_wait_info lwi = { 0 }; + int rc; + ENTRY; + + cfs_daemonize("qsd_reint"); + + CDEBUG(D_QUOTA, "%s: Starting reintegration thread for "DFID"\n", + qsd->qsd_svname, PFID(&qqi->qqi_fid)); + + qqi_getref(qqi); + lu_ref_add(&qqi->qqi_reference, "reint_thread", thread); + + thread_set_flags(thread, SVC_RUNNING); + cfs_waitq_signal(&thread->t_ctl_waitq); + + OBD_ALLOC_PTR(env); + if (env == NULL) + GOTO(out, rc = -ENOMEM); + + /* initialize environment */ + rc = lu_env_init(env, LCT_DT_THREAD); + if (rc) + GOTO(out_env, rc); + qti = qsd_info(env); + + /* wait for the connection to master established */ + l_wait_event(thread->t_ctl_waitq, + qsd_connected(qsd) || !thread_is_running(thread), &lwi); + + /* Step 1: enqueue global index lock */ + if (!thread_is_running(thread)) + GOTO(out_env_init, rc = 0); + + LASSERT(qsd->qsd_exp != NULL); + LASSERT(qqi->qqi_glb_uptodate == 0 || qqi->qqi_slv_uptodate == 0); + + memset(&qti->qti_lvb, 0, sizeof(qti->qti_lvb)); + + cfs_read_lock(&qsd->qsd_lock); + /* check whether we already own a global quota lock for this type */ + if (lustre_handle_is_used(&qqi->qqi_lockh) && + ldlm_lock_addref_try(&qqi->qqi_lockh, qsd_glb_einfo.ei_mode) == 0) { + cfs_read_unlock(&qsd->qsd_lock); + /* force refresh of global & slave index copy */ + qti->qti_lvb.l_lquota.lvb_glb_ver = ~0ULL; + qti->qti_slv_ver = ~0ULL; + } else { + /* no valid lock found, let's enqueue a new one */ + cfs_read_unlock(&qsd->qsd_lock); + + memset(&qti->qti_body, 0, sizeof(qti->qti_body)); + memcpy(&qti->qti_body.qb_fid, &qqi->qqi_fid, + sizeof(qqi->qqi_fid)); + + rc = qsd_intent_lock(env, qsd->qsd_exp, &qti->qti_body, true, + IT_QUOTA_CONN, qsd_reint_completion, qqi, + &qti->qti_lvb, (void *)&qti->qti_slv_ver); + if (rc) + GOTO(out_env_init, rc); + + CDEBUG(D_QUOTA, "%s: glb_ver:"LPU64"/"LPU64",slv_ver:"LPU64"/" + LPU64"\n", qsd->qsd_svname, + qti->qti_lvb.l_lquota.lvb_glb_ver, qqi->qqi_glb_ver, + qti->qti_slv_ver, qqi->qqi_slv_ver); + } + + /* Step 2: reintegrate global index */ + if (!thread_is_running(thread)) + GOTO(out_lock, rc = 0); + + OBD_FAIL_TIMEOUT(OBD_FAIL_QUOTA_DELAY_REINT, 10); + + if (qqi->qqi_glb_ver != qti->qti_lvb.l_lquota.lvb_glb_ver) { + rc = qsd_reint_index(env, qqi, true); + if (rc) { + CWARN("%s: reint global for "DFID" failed. %d\n", + qsd->qsd_svname, PFID(&qqi->qqi_fid), rc); + GOTO(out_lock, rc); + } + } else { + qsd_bump_version(qqi, qqi->qqi_glb_ver, true); + } + + /* Step 3: reintegrate slave index */ + if (!thread_is_running(thread)) + GOTO(out_lock, rc = 0); + + if (qqi->qqi_slv_ver != qti->qti_slv_ver) { + rc = qsd_reint_index(env, qqi, false); + if (rc) { + CWARN("%s: Reint slave for "DFID" failed. %d\n", + qsd->qsd_svname, PFID(&qqi->qqi_slv_fid), rc); + GOTO(out_lock, rc); + } + } else { + qsd_bump_version(qqi, qqi->qqi_slv_ver, false); + } + + /* wait for the connection to master established */ + l_wait_event(thread->t_ctl_waitq, + qsd_started(qsd) || !thread_is_running(thread), &lwi); + + if (!thread_is_running(thread)) + GOTO(out_lock, rc = 0); + + /* Step 4: start reconciliation for each enforced ID */ + rc = qsd_reconciliation(env, qqi); + if (rc) + CWARN("%s: reconciliation failed. "DFID", %d\n", + qsd->qsd_svname, PFID(&qti->qti_fid), rc); + + EXIT; +out_lock: + ldlm_lock_decref(&qqi->qqi_lockh, qsd_glb_einfo.ei_mode); +out_env_init: + lu_env_fini(env); +out_env: + OBD_FREE_PTR(env); +out: + cfs_write_lock(&qsd->qsd_lock); + qqi->qqi_reint = 0; + cfs_write_unlock(&qsd->qsd_lock); + + qqi_putref(qqi); + lu_ref_del(&qqi->qqi_reference, "reint_thread", thread); + + thread_set_flags(thread, SVC_STOPPED); + cfs_waitq_signal(&thread->t_ctl_waitq); + return rc; +} + void qsd_stop_reint_thread(struct qsd_qtype_info *qqi) { + struct ptlrpc_thread *thread = &qqi->qqi_reint_thread; + struct l_wait_info lwi = { 0 }; + + if (!thread_is_stopped(thread)) { + thread_set_flags(thread, SVC_STOPPING); + cfs_waitq_signal(&thread->t_ctl_waitq); + + l_wait_event(thread->t_ctl_waitq, + thread_is_stopped(thread), &lwi); + } } int qsd_start_reint_thread(struct qsd_qtype_info *qqi) { + struct ptlrpc_thread *thread = &qqi->qqi_reint_thread; + struct qsd_instance *qsd = qqi->qqi_qsd; + struct l_wait_info lwi = { 0 }; + int rc; + ENTRY; + + /* don't bother to do reintegration when quota isn't enabled */ + if (!qsd_type_enabled(qqi->qqi_qsd, qqi->qqi_qtype)) + RETURN(0); + + /* check if the reintegration has already started or finished */ + cfs_write_lock(&qsd->qsd_lock); + + if ((qqi->qqi_glb_uptodate && qqi->qqi_slv_uptodate) || + qqi->qqi_reint || qsd->qsd_stopping) { + cfs_write_unlock(&qsd->qsd_lock); + RETURN(0); + } + qqi->qqi_reint = 1; + + cfs_write_unlock(&qsd->qsd_lock); + + rc = cfs_create_thread(qsd_reint_main, (void *)qqi, 0); + if (rc < 0) { + thread_set_flags(thread, SVC_STOPPED); + cfs_write_lock(&qsd->qsd_lock); + qqi->qqi_reint = 0; + cfs_write_unlock(&qsd->qsd_lock); + RETURN(rc); + } + + l_wait_event(thread->t_ctl_waitq, + thread_is_running(thread) || thread_is_stopped(thread), + &lwi); RETURN(0); } diff --git a/lustre/quota/qsd_request.c b/lustre/quota/qsd_request.c index a0c830c..fa98b56 100644 --- a/lustre/quota/qsd_request.c +++ b/lustre/quota/qsd_request.c @@ -393,6 +393,7 @@ int qsd_fetch_index(const struct lu_env *env, struct obd_export *exp, req->rq_bulk->bd_nob_transferred); if (rc < 0) GOTO(out, rc); + rc = 0; req_ii = req_capsule_server_get(&req->rq_pill, &RMF_IDX_INFO); *ii = *req_ii; diff --git a/lustre/quota/qsd_writeback.c b/lustre/quota/qsd_writeback.c index 4ed599b..cafadd6 100644 --- a/lustre/quota/qsd_writeback.c +++ b/lustre/quota/qsd_writeback.c @@ -36,6 +36,135 @@ #include "qsd_internal.h" +extern cfs_mem_cache_t *upd_kmem; + +/* + * Allocate and fill an qsd_upd_rec structure to be processed by the writeback + * thread. + * + * \param qqi - is the qsd_qtype_info structure relevant to the update + * \param lqe - is the lquota entry subject to the update + * \param qid - is the identifier subject to the update + * \param rec - is the record storing the new quota settings + * \param ver - is the version associated with the update + * \param global - is a boolean set to true if this is an update of the global + * index and false for a slave index. + */ +static struct qsd_upd_rec *qsd_upd_alloc(struct qsd_qtype_info *qqi, + struct lquota_entry *lqe, + union lquota_id *qid, + union lquota_rec *rec, __u64 ver, + bool global) +{ + struct qsd_upd_rec *upd; + + OBD_SLAB_ALLOC_PTR_GFP(upd, upd_kmem, CFS_ALLOC_IO); + if (upd == NULL) { + CERROR("Failed to allocate upd"); + return NULL; + } + + /* fill it */ + CFS_INIT_LIST_HEAD(&upd->qur_link); + upd->qur_qqi = qqi; + upd->qur_lqe = lqe; + if (lqe) + lqe_getref(lqe); + upd->qur_qid = *qid; + upd->qur_rec = *rec; + upd->qur_ver = ver; + upd->qur_global = global; + + return upd; +} + +static void qsd_upd_free(struct qsd_upd_rec *upd) +{ + if (upd->qur_lqe) + lqe_putref(upd->qur_lqe); + OBD_SLAB_FREE_PTR(upd, upd_kmem); +} + +/* must hold the qsd_lock */ +static void qsd_upd_add(struct qsd_instance *qsd, struct qsd_upd_rec *upd) +{ + if (!qsd->qsd_stopping) { + list_add_tail(&upd->qur_link, &qsd->qsd_upd_list); + /* wake up the upd thread */ + cfs_waitq_signal(&qsd->qsd_upd_thread.t_ctl_waitq); + } else { + CWARN("%s: discard deferred update.\n", qsd->qsd_svname); + if (upd->qur_lqe) + LQUOTA_WARN(upd->qur_lqe, "discard deferred update."); + qsd_upd_free(upd); + } +} + +/* must hold the qsd_lock */ +static void qsd_add_deferred(cfs_list_t *list, struct qsd_upd_rec *upd) +{ + struct qsd_upd_rec *tmp; + + /* Sort the updates in ascending order */ + cfs_list_for_each_entry_reverse(tmp, list, qur_link) { + + LASSERTF(upd->qur_ver != tmp->qur_ver, "ver:"LPU64"\n", + upd->qur_ver); + + if (upd->qur_ver < tmp->qur_ver) { + continue; + } else { + cfs_list_add_tail(&upd->qur_link, &tmp->qur_link); + return; + } + } + cfs_list_add(&upd->qur_link, list); +} + +/* must hold the qsd_lock */ +static void qsd_kickoff_deferred(struct qsd_qtype_info *qqi, cfs_list_t *list, + __u64 ver) +{ + struct qsd_upd_rec *upd, *tmp; + ENTRY; + + /* Get the first update record in the list, which has the smallest + * version, discard all records with versions smaller than the current + * one */ + cfs_list_for_each_entry_safe(upd, tmp, list, qur_link) { + if (upd->qur_ver <= ver) { + /* drop this update */ + cfs_list_del_init(&upd->qur_link); + CDEBUG(D_QUOTA, "%s: skipping deferred update ver:" + LPU64"/"LPU64", global:%d, qid:"LPU64"\n", + qqi->qqi_qsd->qsd_svname, upd->qur_ver, ver, + upd->qur_global, upd->qur_qid.qid_uid); + qsd_upd_free(upd); + } else { + break; + } + } + + /* No remaining deferred update */ + if (cfs_list_empty(list)) + RETURN_EXIT; + + CDEBUG(D_QUOTA, "%s: found deferred update record. " + "version:"LPU64"/"LPU64", global:%d, qid:"LPU64"\n", + qqi->qqi_qsd->qsd_svname, upd->qur_ver, ver, + upd->qur_global, upd->qur_qid.qid_uid); + + LASSERTF(upd->qur_ver > ver, "lur_ver:"LPU64", cur_ver:"LPU64"\n", + upd->qur_ver, ver); + + /* Kick off the deferred udpate */ + if (upd->qur_ver == ver + 1) { + list_del_init(&upd->qur_link); + qsd_upd_add(qqi->qqi_qsd, upd); + } + EXIT; +} + /* Bump version of global or slave index copy * * \param qqi - qsd_qtype_info @@ -44,6 +173,20 @@ */ void qsd_bump_version(struct qsd_qtype_info *qqi, __u64 ver, bool global) { + cfs_list_t *list; + __u64 *idx_ver; + + idx_ver = global ? &qqi->qqi_glb_ver : &qqi->qqi_slv_ver; + list = global ? &qqi->qqi_deferred_glb : &qqi->qqi_deferred_slv; + + cfs_write_lock(&qqi->qqi_qsd->qsd_lock); + *idx_ver = ver; + if (global) + qqi->qqi_glb_uptodate = 1; + else + qqi->qqi_slv_uptodate = 1; + qsd_kickoff_deferred(qqi, list, ver); + cfs_write_unlock(&qqi->qqi_qsd->qsd_lock); } /* @@ -60,4 +203,344 @@ void qsd_upd_schedule(struct qsd_qtype_info *qqi, struct lquota_entry *lqe, union lquota_id *qid, union lquota_rec *rec, __u64 ver, bool global) { + struct qsd_upd_rec *upd; + struct qsd_instance *qsd = qqi->qqi_qsd; + __u64 cur_ver; + ENTRY; + + CDEBUG(D_QUOTA, "%s: schedule update. global:%s, version:"LPU64"\n", + qsd->qsd_svname, global ? "true" : "false", ver); + + upd = qsd_upd_alloc(qqi, lqe, qid, rec, ver, global); + if (upd == NULL) + RETURN_EXIT; + + /* If we don't want update index version, no need to sort the + * records in version order, just schedule the updates instantly. */ + if (ver == 0) { + cfs_write_lock(&qsd->qsd_lock); + qsd_upd_add(qsd, upd); + cfs_write_unlock(&qsd->qsd_lock); + RETURN_EXIT; + } + + cfs_write_lock(&qsd->qsd_lock); + + cur_ver = global ? qqi->qqi_glb_ver : qqi->qqi_slv_ver; + + if (ver <= cur_ver) { + if (global) + /* legitimate race between glimpse AST and + * reintegration */ + CDEBUG(D_QUOTA, "%s: discarding glb update from glimpse" + " ver:"LPU64" local ver:"LPU64"\n", + qsd->qsd_svname, ver, cur_ver); + else + CERROR("%s: discard slv update, ver:"LPU64" local ver:" + LPU64"\n", qsd->qsd_svname, ver, cur_ver); + qsd_upd_free(upd); + } else if ((ver == cur_ver + 1) && qqi->qqi_glb_uptodate && + qqi->qqi_slv_uptodate) { + /* In order update, and reintegration has been done. */ + qsd_upd_add(qsd, upd); + } else { + /* Out of order update (the one with smaller version hasn't + * reached slave or hasn't been flushed to disk yet), or + * the reintegration is in progress. Defer the update. */ + cfs_list_t *list = global ? &qqi->qqi_deferred_glb : + &qqi->qqi_deferred_slv; + qsd_add_deferred(list, upd); + } + + cfs_write_unlock(&qsd->qsd_lock); + + EXIT; +} + +static int qsd_process_upd(const struct lu_env *env, struct qsd_upd_rec *upd) +{ + struct lquota_entry *lqe = upd->qur_lqe; + struct qsd_qtype_info *qqi = upd->qur_qqi; + int rc; + ENTRY; + + if (lqe == NULL) { + lqe = lqe_locate(env, qqi->qqi_site, &upd->qur_qid); + if (IS_ERR(lqe)) + GOTO(out, rc = PTR_ERR(lqe)); + } + + /* The in-memory lqe update for slave index copy isn't deferred, + * we shouldn't touch it here. */ + if (upd->qur_global) { + rc = qsd_update_lqe(env, lqe, upd->qur_global, &upd->qur_rec); + if (rc) + GOTO(out, rc); + /* Report usage asynchronously */ + if (lqe->lqe_enforced && + !qsd_refresh_usage(env, lqe)) { + rc = qsd_dqacq(env, lqe, QSD_REP); + LQUOTA_DEBUG(lqe, "Report usage. rc:%d", rc); + } + } + + rc = qsd_update_index(env, qqi, &upd->qur_qid, upd->qur_global, + upd->qur_ver, &upd->qur_rec); +out: + if (lqe && !IS_ERR(lqe)) { + lqe_putref(lqe); + upd->qur_lqe = NULL; + } + RETURN(rc); +} + +void qsd_adjust_schedule(struct lquota_entry *lqe, bool defer, bool cancel) +{ + struct qsd_instance *qsd = lqe2qqi(lqe)->qqi_qsd; + bool added = false; + + lqe_getref(lqe); + cfs_spin_lock(&qsd->qsd_adjust_lock); + + /* the lqe is being queued for the per-ID lock cancel, we should + * cancel the lock cancel and re-add it for quota adjust */ + if (!cfs_list_empty(&lqe->lqe_link) && + lqe->lqe_adjust_time == 0) { + cfs_list_del_init(&lqe->lqe_link); + lqe_putref(lqe); + } + + if (cfs_list_empty(&lqe->lqe_link)) { + if (cancel) + lqe->lqe_adjust_time = 0; + else + lqe->lqe_adjust_time = defer ? + cfs_time_shift_64(QSD_WB_INTERVAL) : + cfs_time_current_64(); + /* lqe reference transfered to list */ + if (defer) + cfs_list_add_tail(&lqe->lqe_link, + &qsd->qsd_adjust_list); + else + cfs_list_add(&lqe->lqe_link, &qsd->qsd_adjust_list); + added = true; + } + cfs_spin_unlock(&qsd->qsd_adjust_lock); + + if (added) + cfs_waitq_signal(&qsd->qsd_upd_thread.t_ctl_waitq); + else + lqe_putref(lqe); +} + +/* return true if there is pending writeback records or the pending + * adjust requests */ +static bool qsd_job_pending(struct qsd_instance *qsd, cfs_list_t *upd, + bool *uptodate) +{ + bool job_pending = false; + int qtype; + + LASSERT(cfs_list_empty(upd)); + *uptodate = true; + + cfs_spin_lock(&qsd->qsd_adjust_lock); + if (!cfs_list_empty(&qsd->qsd_adjust_list)) { + struct lquota_entry *lqe; + lqe = cfs_list_entry(qsd->qsd_adjust_list.next, + struct lquota_entry, lqe_link); + if (cfs_time_beforeq_64(lqe->lqe_adjust_time, + cfs_time_current_64())) + job_pending = true; + } + cfs_spin_unlock(&qsd->qsd_adjust_lock); + + cfs_write_lock(&qsd->qsd_lock); + if (!cfs_list_empty(&qsd->qsd_upd_list)) { + cfs_list_splice_init(&qsd->qsd_upd_list, upd); + job_pending = true; + } + + for (qtype = USRQUOTA; qtype < MAXQUOTAS; qtype++) { + struct qsd_qtype_info *qqi = qsd->qsd_type_array[qtype]; + + if (!qsd_type_enabled(qsd, qtype)) + continue; + + if (!qqi->qqi_glb_uptodate || !qqi->qqi_slv_uptodate) + *uptodate = false; + } + + cfs_write_unlock(&qsd->qsd_lock); + return job_pending; +} + +static int qsd_upd_thread(void *arg) +{ + struct qsd_instance *qsd = (struct qsd_instance *)arg; + struct ptlrpc_thread *thread = &qsd->qsd_upd_thread; + struct l_wait_info lwi; + cfs_list_t queue; + struct qsd_upd_rec *upd, *n; + char pname[MTI_NAME_MAXLEN]; + struct lu_env *env; + int qtype, rc = 0; + bool uptodate; + struct lquota_entry *lqe, *tmp; + __u64 cur_time; + ENTRY; + + OBD_ALLOC_PTR(env); + if (env == NULL) + RETURN(-ENOMEM); + + rc = lu_env_init(env, LCT_DT_THREAD); + if (rc) { + CERROR("%s: Fail to init env.", qsd->qsd_svname); + OBD_FREE_PTR(env); + RETURN(rc); + } + + snprintf(pname, MTI_NAME_MAXLEN, "lquota_wb_%s", qsd->qsd_svname); + cfs_daemonize(pname); + + thread_set_flags(thread, SVC_RUNNING); + cfs_waitq_signal(&thread->t_ctl_waitq); + + CFS_INIT_LIST_HEAD(&queue); + lwi = LWI_TIMEOUT(cfs_time_seconds(QSD_WB_INTERVAL), NULL, NULL); + while (1) { + l_wait_event(thread->t_ctl_waitq, + qsd_job_pending(qsd, &queue, &uptodate) || + !thread_is_running(thread), &lwi); + + cfs_list_for_each_entry_safe(upd, n, &queue, qur_link) { + cfs_list_del_init(&upd->qur_link); + qsd_process_upd(env, upd); + qsd_upd_free(upd); + } + + cfs_spin_lock(&qsd->qsd_adjust_lock); + cur_time = cfs_time_current_64(); + cfs_list_for_each_entry_safe(lqe, tmp, &qsd->qsd_adjust_list, + lqe_link) { + /* deferred items are sorted by time */ + if (!cfs_time_beforeq_64(lqe->lqe_adjust_time, + cur_time)) + break; + + cfs_list_del_init(&lqe->lqe_link); + cfs_spin_unlock(&qsd->qsd_adjust_lock); + + if (thread_is_running(thread) && uptodate) { + qsd_refresh_usage(env, lqe); + if (lqe->lqe_adjust_time == 0) + qsd_id_lock_cancel(env, lqe); + else + qsd_dqacq(env, lqe, QSD_ADJ); + } + + lqe_putref(lqe); + cfs_spin_lock(&qsd->qsd_adjust_lock); + } + cfs_spin_unlock(&qsd->qsd_adjust_lock); + + if (!thread_is_running(thread)) + break; + + if (uptodate) + continue; + + for (qtype = USRQUOTA; qtype < MAXQUOTAS; qtype++) + qsd_start_reint_thread(qsd->qsd_type_array[qtype]); + } + lu_env_fini(env); + OBD_FREE_PTR(env); + thread_set_flags(thread, SVC_STOPPED); + cfs_waitq_signal(&thread->t_ctl_waitq); + RETURN(rc); +} + +int qsd_start_upd_thread(struct qsd_instance *qsd) +{ + struct ptlrpc_thread *thread = &qsd->qsd_upd_thread; + struct l_wait_info lwi = { 0 }; + int rc; + ENTRY; + + rc = cfs_create_thread(qsd_upd_thread, (void *)qsd, 0); + if (rc < 0) { + CERROR("Fail to start quota update thread. rc: %d\n", rc); + thread_set_flags(thread, SVC_STOPPED); + RETURN(rc); + } + + l_wait_event(thread->t_ctl_waitq, + thread_is_running(thread) || thread_is_stopped(thread), + &lwi); + RETURN(0); +} + +static void qsd_cleanup_deferred(struct qsd_instance *qsd) +{ + int qtype; + + for (qtype = USRQUOTA; qtype < MAXQUOTAS; qtype++) { + struct qsd_upd_rec *upd, *tmp; + struct qsd_qtype_info *qqi = qsd->qsd_type_array[qtype]; + + if (qqi == NULL) + continue; + + cfs_write_lock(&qsd->qsd_lock); + cfs_list_for_each_entry_safe(upd, tmp, &qqi->qqi_deferred_glb, + qur_link) { + CWARN("%s: Free global deferred upd: ID:"LPU64", " + "ver:"LPU64"/"LPU64"\n", qsd->qsd_svname, + upd->qur_qid.qid_uid, upd->qur_ver, + qqi->qqi_glb_ver); + list_del_init(&upd->qur_link); + qsd_upd_free(upd); + } + cfs_list_for_each_entry_safe(upd, tmp, &qqi->qqi_deferred_slv, + qur_link) { + CWARN("%s: Free slave deferred upd: ID:"LPU64", " + "ver:"LPU64"/"LPU64"\n", qsd->qsd_svname, + upd->qur_qid.qid_uid, upd->qur_ver, + qqi->qqi_slv_ver); + list_del_init(&upd->qur_link); + qsd_upd_free(upd); + } + cfs_write_unlock(&qsd->qsd_lock); + } +} + +static void qsd_cleanup_adjust(struct qsd_instance *qsd) +{ + struct lquota_entry *lqe; + + cfs_spin_lock(&qsd->qsd_adjust_lock); + while (!cfs_list_empty(&qsd->qsd_adjust_list)) { + lqe = cfs_list_entry(qsd->qsd_adjust_list.next, + struct lquota_entry, lqe_link); + cfs_list_del_init(&lqe->lqe_link); + lqe_putref(lqe); + } + cfs_spin_unlock(&qsd->qsd_adjust_lock); +} + +void qsd_stop_upd_thread(struct qsd_instance *qsd) +{ + struct ptlrpc_thread *thread = &qsd->qsd_upd_thread; + struct l_wait_info lwi = { 0 }; + + if (!thread_is_stopped(thread)) { + thread_set_flags(thread, SVC_STOPPING); + cfs_waitq_signal(&thread->t_ctl_waitq); + + l_wait_event(thread->t_ctl_waitq, thread_is_stopped(thread), + &lwi); + } + qsd_cleanup_deferred(qsd); + qsd_cleanup_adjust(qsd); } -- 1.8.3.1