From 2d61b338d9990f2098ca646ecfe818668579ca76 Mon Sep 17 00:00:00 2001 From: Johann Lombardi Date: Wed, 3 Oct 2012 15:02:42 +0200 Subject: [PATCH] LU-1842 quota: add quota locks support on QMT This patch adds methods for the quota lock lvb management and implements an intent policy for quota locks. It also introduces the rebalance thread which is in charge of sending glimpse callbacks to slave to notify them of new quota settings. This patch also enables sanity-quota.sh test 36 which should now pass. Signed-off-by: Johann Lombardi Change-Id: I0cf7f86d480296f7e8fe774ed8937aae27736f88 Reviewed-on: http://review.whamcloud.com/4166 Tested-by: Hudson Tested-by: Maloo --- lustre/mdt/mdt_handler.c | 4 +- lustre/quota/qmt_dev.c | 29 +- lustre/quota/qmt_handler.c | 3 + lustre/quota/qmt_internal.h | 13 + lustre/quota/qmt_lock.c | 632 ++++++++++++++++++++++++++++++++++++++++++- lustre/tests/Makefile.am | 1 + lustre/tests/sanity-quota.sh | 18 +- 7 files changed, 672 insertions(+), 28 deletions(-) diff --git a/lustre/mdt/mdt_handler.c b/lustre/mdt/mdt_handler.c index 4b4882b..1a89c83 100644 --- a/lustre/mdt/mdt_handler.c +++ b/lustre/mdt/mdt_handler.c @@ -5035,8 +5035,6 @@ static void mdt_fini(const struct lu_env *env, struct mdt_device *m) mdt_procfs_fini(m); - mdt_quota_fini(env, m); - lut_fini(env, &m->mdt_lut); mdt_fs_cleanup(env, m); upcall_cache_cleanup(m->mdt_identity_cache); @@ -5048,6 +5046,8 @@ static void mdt_fini(const struct lu_env *env, struct mdt_device *m) d->ld_obd->obd_namespace = m->mdt_namespace = NULL; } + mdt_quota_fini(env, m); + cfs_free_nidlist(&m->mdt_nosquash_nids); if (m->mdt_nosquash_str) { OBD_FREE(m->mdt_nosquash_str, m->mdt_nosquash_strlen); diff --git a/lustre/quota/qmt_dev.c b/lustre/quota/qmt_dev.c index 8512035..685f50f 100644 --- a/lustre/quota/qmt_dev.c +++ b/lustre/quota/qmt_dev.c @@ -110,6 +110,9 @@ static struct lu_device *qmt_device_fini(const struct lu_env *env, qmt->qmt_proc = NULL; } + /* stop rebalance thread */ + qmt_stop_reba_thread(qmt); + /* disconnect from OSD */ if (qmt->qmt_child_exp != NULL) { obd_disconnect(qmt->qmt_child_exp); @@ -117,12 +120,9 @@ static struct lu_device *qmt_device_fini(const struct lu_env *env, qmt->qmt_child = NULL; } - /* release reference on MDT namespace */ - if (ld->ld_obd->obd_namespace != NULL) { - ldlm_namespace_put(ld->ld_obd->obd_namespace); - ld->ld_obd->obd_namespace = NULL; - qmt->qmt_ns = NULL; - } + /* clear references to MDT namespace */ + ld->ld_obd->obd_namespace = NULL; + qmt->qmt_ns = NULL; RETURN(NULL); } @@ -227,10 +227,9 @@ static int qmt_device_init0(const struct lu_env *env, struct qmt_device *qmt, if (mdt_obd == NULL) RETURN(-ENOENT); - /* grab reference on MDT namespace. kind of a hack until we have our - * own namespace & service threads */ + /* borrow MDT namespace. kind of a hack until we have our own namespace + * & service threads */ LASSERT(mdt_obd->obd_namespace != NULL); - ldlm_namespace_get(mdt_obd->obd_namespace); obd->obd_namespace = mdt_obd->obd_namespace; qmt->qmt_ns = obd->obd_namespace; @@ -239,6 +238,18 @@ static int qmt_device_init0(const struct lu_env *env, struct qmt_device *qmt, if (rc) GOTO(out, rc); + /* set up and start rebalance thread */ + thread_set_flags(&qmt->qmt_reba_thread, SVC_STOPPED); + cfs_waitq_init(&qmt->qmt_reba_thread.t_ctl_waitq); + CFS_INIT_LIST_HEAD(&qmt->qmt_reba_list); + cfs_spin_lock_init(&qmt->qmt_reba_lock); + rc = qmt_start_reba_thread(qmt); + if (rc) { + CERROR("%s: failed to start rebalance thread (%d)\n", + qmt->qmt_svname, rc); + GOTO(out, rc); + } + /* at the moment there is no linkage between lu_type and obd_type, so * we lookup obd_type this way */ type = class_search_type(LUSTRE_QMT_NAME); diff --git a/lustre/quota/qmt_handler.c b/lustre/quota/qmt_handler.c index 19e706f..9a4c416 100644 --- a/lustre/quota/qmt_handler.c +++ b/lustre/quota/qmt_handler.c @@ -291,6 +291,9 @@ out_nolock: if (th != NULL && !IS_ERR(th)) dt_trans_stop(env, qmt->qmt_child, th); + if (rc == 0 && bump_version) + qmt_glb_lock_notify(env, lqe, ver); + return rc; } diff --git a/lustre/quota/qmt_internal.h b/lustre/quota/qmt_internal.h index 7802006..e3291c6 100644 --- a/lustre/quota/qmt_internal.h +++ b/lustre/quota/qmt_internal.h @@ -73,6 +73,15 @@ struct qmt_device { /* procfs root directory for this qmt */ cfs_proc_dir_entry_t *qmt_proc; + /* dedicated thread in charge of space rebalancing */ + struct ptlrpc_thread qmt_reba_thread; + + /* list of lqe entry which need space rebalancing */ + cfs_list_t qmt_reba_list; + + /* lock protecting rebalancing list */ + cfs_spinlock_t qmt_reba_lock; + unsigned long qmt_stopping:1; /* qmt is stopping */ }; @@ -252,4 +261,8 @@ int qmt_lvbo_update(struct lu_device *, struct ldlm_resource *, int qmt_lvbo_size(struct lu_device *, struct ldlm_lock *); int qmt_lvbo_fill(struct lu_device *, struct ldlm_lock *, void *, int); int qmt_lvbo_free(struct lu_device *, struct ldlm_resource *); +int qmt_start_reba_thread(struct qmt_device *); +void qmt_stop_reba_thread(struct qmt_device *); +void qmt_glb_lock_notify(const struct lu_env *, struct lquota_entry *, __u64); +void qmt_id_lock_notify(struct qmt_device *, struct lquota_entry *); #endif /* _QMT_INTERNAL_H */ diff --git a/lustre/quota/qmt_lock.c b/lustre/quota/qmt_lock.c index 18730cd..e37d041 100644 --- a/lustre/quota/qmt_lock.c +++ b/lustre/quota/qmt_lock.c @@ -35,6 +35,8 @@ #define DEBUG_SUBSYSTEM S_LQUOTA #include +#include + #include "qmt_internal.h" /* intent policy function called from mdt_intent_opc() when the intent is of @@ -43,10 +45,68 @@ int qmt_intent_policy(const struct lu_env *env, struct lu_device *ld, struct ptlrpc_request *req, struct ldlm_lock **lockp, int flags) { + struct qmt_device *qmt = lu2qmt_dev(ld); + struct ldlm_intent *it; + struct quota_body *reqbody; + struct quota_body *repbody; + struct obd_uuid *uuid; + struct lquota_lvb *lvb; + int rc; ENTRY; req_capsule_extend(&req->rq_pill, &RQF_LDLM_INTENT_QUOTA); - RETURN(ELDLM_LOCK_ABORTED); + + /* extract quota body and intent opc */ + it = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT); + if (it == NULL) + RETURN(err_serious(-EFAULT)); + + reqbody = req_capsule_client_get(&req->rq_pill, &RMF_QUOTA_BODY); + if (reqbody == NULL) + RETURN(err_serious(-EFAULT)); + + /* prepare reply */ + rc = req_capsule_server_pack(&req->rq_pill); + if (rc != 0) { + CERROR("Can't pack response, rc %d\n", rc); + RETURN(err_serious(rc)); + } + + repbody = req_capsule_server_get(&req->rq_pill, &RMF_QUOTA_BODY); + if (repbody == NULL) + RETURN(err_serious(-EFAULT)); + + uuid = &(*lockp)->l_export->exp_client_uuid; + switch (it->opc) { + + case IT_QUOTA_DQACQ: + /* XXX: to be added in a next patch */ + GOTO(out, -EOPNOTSUPP); + break; + + case IT_QUOTA_CONN: + /* new connection from slave */ + rc = qmt_pool_new_conn(env, qmt, &reqbody->qb_fid, + &repbody->qb_slv_fid, + &repbody->qb_slv_ver, uuid); + if (rc) + GOTO(out, rc); + break; + + default: + CERROR("%s: invalid intent opcode: "LPU64"\n", qmt->qmt_svname, + it->opc); + GOTO(out, rc = err_serious(-EINVAL)); + } + + /* on success, pack lvb in reply */ + req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, + ldlm_lvbo_size(*lockp)); + lvb = req_capsule_server_get(&req->rq_pill, &RMF_DLM_LVB); + ldlm_lvbo_fill(*lockp, lvb, ldlm_lvbo_size(*lockp)); + EXIT; +out: + return rc; } /* @@ -55,43 +115,595 @@ int qmt_intent_policy(const struct lu_env *env, struct lu_device *ld, */ int qmt_lvbo_init(struct lu_device *ld, struct ldlm_resource *res) { - return 0; + struct lu_env *env; + struct qmt_thread_info *qti; + struct qmt_device *qmt = lu2qmt_dev(ld); + int pool_id, pool_type, qtype; + int rc; + ENTRY; + + LASSERT(res != NULL); + + if (res->lr_type != LDLM_PLAIN) + RETURN(-ENOTSUPP); + + if (res->lr_lvb_data || + res->lr_name.name[LUSTRE_RES_ID_SEQ_OFF] != FID_SEQ_QUOTA_GLB) + RETURN(0); + + OBD_ALLOC_PTR(env); + if (env == NULL) + RETURN(-ENOMEM); + + /* initialize environment */ + rc = lu_env_init(env, LCT_MD_THREAD); + if (rc) { + OBD_FREE_PTR(env); + RETURN(rc); + } + qti = qmt_info(env); + + /* extract global index FID and quota identifier */ + fid_extract_quota_resid(&res->lr_name, &qti->qti_fid, &qti->qti_id); + + /* sanity check the global index FID */ + rc = lquota_extract_fid(&qti->qti_fid, &pool_id, &pool_type, &qtype); + if (rc) { + CERROR("can't extract pool information from FID "DFID"\n", + PFID(&qti->qti_fid)); + GOTO(out, rc); + } + + if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] != 0) { + /* no ID quota lock associated with UID/GID 0 or with a seq 0, + * we are thus dealing with an ID lock. */ + struct lquota_entry *lqe; + + /* Find the quota entry associated with the quota id */ + lqe = qmt_pool_lqe_lookup(env, qmt, pool_id, pool_type, qtype, + &qti->qti_id); + if (IS_ERR(lqe)) + GOTO(out, rc = PTR_ERR(lqe)); + + /* store reference to lqe in lr_lvb_data */ + res->lr_lvb_data = lqe; + LQUOTA_DEBUG(lqe, "initialized res lvb"); + } else { + struct dt_object *obj; + + /* lookup global index */ + obj = dt_locate(env, qmt->qmt_child, &qti->qti_fid); + if (IS_ERR(obj)) + GOTO(out, rc = PTR_ERR(obj)); + if (!dt_object_exists(obj)) { + lu_object_put(env, &obj->do_lu); + GOTO(out, rc = -ENOENT); + } + + /* store reference to global index object in lr_lvb_data */ + res->lr_lvb_data = obj; + CDEBUG(D_QUOTA, DFID" initialized lvb\n", PFID(&qti->qti_fid)); + } + + res->lr_lvb_len = sizeof(struct lquota_lvb); + EXIT; +out: + lu_env_fini(env); + OBD_FREE_PTR(env); + return rc; } /* * Update LVB associated with the global quota index. * This function is called from the DLM itself after a glimpse callback, in this - * case valid ptlrpc request is passed. It is also called directly from the - * quota master in order to refresh the global index version after a quota limit - * change. + * case valid ptlrpc request is passed. */ int qmt_lvbo_update(struct lu_device *ld, struct ldlm_resource *res, struct ptlrpc_request *req, int increase_only) { - return 0; + struct lu_env *env; + struct qmt_thread_info *qti; + struct qmt_device *qmt = lu2qmt_dev(ld); + struct lquota_entry *lqe; + struct lquota_lvb *lvb; + int rc = 0; + ENTRY; + + LASSERT(res != NULL); + + if (req == NULL) + RETURN(0); + + if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] == 0) + /* no need to update lvb for global quota locks */ + RETURN(0); + + lqe = res->lr_lvb_data; + LASSERT(lqe != NULL); + + /* allocate environement */ + OBD_ALLOC_PTR(env); + if (env == NULL) + RETURN(-ENOMEM); + + /* initialize environment */ + rc = lu_env_init(env, LCT_MD_THREAD); + if (rc) { + OBD_FREE_PTR(env); + RETURN(rc); + } + qti = qmt_info(env); + + lvb = req_capsule_server_get(&req->rq_pill, &RMF_DLM_LVB); + if (lvb == NULL) { + CERROR("%s: failed to extract lvb from request\n", + qmt->qmt_svname); + GOTO(out, rc); + } + + /* XXX: Space release handling to be added in a next patch */ + + EXIT; +out: + lu_env_fini(env); + OBD_FREE_PTR(env); + return rc; } /* * Report size of lvb to ldlm layer in order to allocate lvb buffer + * As far as quota locks are concerned, the size is static and is the same + * for both global and per-ID locks which shares the same lvb format. */ int qmt_lvbo_size(struct lu_device *ld, struct ldlm_lock *lock) { - return 0; + return sizeof(struct lquota_lvb); } /* - * Fill request buffer with lvb + * Fill request buffer with quota lvb */ int qmt_lvbo_fill(struct lu_device *ld, struct ldlm_lock *lock, void *lvb, int lvblen) { - return 0; + struct ldlm_resource *res = lock->l_resource; + struct lquota_lvb *qlvb = lvb; + ENTRY; + + LASSERT(res != NULL); + + if (res->lr_type != LDLM_PLAIN || res->lr_lvb_data == NULL || + res->lr_name.name[LUSTRE_RES_ID_SEQ_OFF] != FID_SEQ_QUOTA_GLB) + RETURN(-EINVAL); + + if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] != 0) { + /* no ID quota lock associated with UID/GID 0 or with a seq 0, + * we are thus dealing with an ID lock. */ + struct lquota_entry *lqe = res->lr_lvb_data; + + /* return current qunit value & edquot flags in lvb */ + lqe_getref(lqe); + qlvb->lvb_id_qunit = lqe->lqe_qunit; + qlvb->lvb_flags = 0; + if (lqe->lqe_edquot) + qlvb->lvb_flags = LQUOTA_FL_EDQUOT; + lqe_putref(lqe); + } else { + /* global quota lock */ + struct lu_env *env; + int rc; + struct dt_object *obj = res->lr_lvb_data; + + OBD_ALLOC_PTR(env); + if (env == NULL) + RETURN(-ENOMEM); + + /* initialize environment */ + rc = lu_env_init(env, LCT_LOCAL); + if (rc) { + OBD_FREE_PTR(env); + RETURN(rc); + } + + /* return current version of global index */ + qlvb->lvb_glb_ver = dt_version_get(env, obj); + + lu_env_fini(env); + OBD_FREE_PTR(env); + } + + RETURN(sizeof(struct lquota_lvb)); } /* * Free lvb associated with a given ldlm resource + * we don't really allocate a lvb, lr_lvb_data just points to + * the appropriate backend structures. */ int qmt_lvbo_free(struct lu_device *ld, struct ldlm_resource *res) { - return 0; + ENTRY; + + if (res->lr_lvb_data == NULL) + RETURN(0); + + if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] != 0) { + struct lquota_entry *lqe = res->lr_lvb_data; + + /* release lqe reference */ + lqe_putref(lqe); + } else { + struct dt_object *obj = res->lr_lvb_data; + struct lu_env *env; + int rc; + + OBD_ALLOC_PTR(env); + if (env == NULL) + RETURN(-ENOMEM); + + /* initialize environment */ + rc = lu_env_init(env, LCT_LOCAL); + if (rc) { + OBD_FREE_PTR(env); + RETURN(rc); + } + + /* release object reference */ + lu_object_put(env, &obj->do_lu); + lu_env_fini(env); + OBD_FREE_PTR(env); + } + + res->lr_lvb_data = NULL; + res->lr_lvb_len = 0; + + RETURN(0); +} + +typedef int (*qmt_glimpse_cb_t)(const struct lu_env *, struct qmt_device *, + struct obd_uuid *, union ldlm_gl_desc *, + void *); +/* + * Send glimpse callback to slaves holding a lock on resource \res. + * This is used to notify slaves of new quota settings or to claim quota space + * back. + * + * \param env - is the environment passed by the caller + * \param qmt - is the quota master target + * \param res - is the dlm resource associated with the quota object + * \param desc - is the glimpse descriptor to pack in glimpse callback + * \param cb - is the callback function called on every lock and determine + * whether a glimpse should be issued + * \param arg - is an opaq parameter passed to the callback function + */ +static int qmt_glimpse_lock(const struct lu_env *env, struct qmt_device *qmt, + struct ldlm_resource *res, union ldlm_gl_desc *desc, + qmt_glimpse_cb_t cb, void *arg) +{ + cfs_list_t *tmp, *pos; + CFS_LIST_HEAD(gl_list); + int rc = 0; + ENTRY; + + lock_res(res); + /* scan list of granted locks */ + cfs_list_for_each(pos, &res->lr_granted) { + struct ldlm_glimpse_work *work; + struct ldlm_lock *lock; + struct obd_uuid *uuid; + + lock = cfs_list_entry(pos, struct ldlm_lock, l_res_link); + LASSERT(lock->l_export); + uuid = &lock->l_export->exp_client_uuid; + + if (cb != NULL) { + rc = cb(env, qmt, uuid, desc, arg); + if (rc == 0) + /* slave should not be notified */ + continue; + if (rc < 0) + /* something wrong happened, we still notify */ + CERROR("%s: callback function failed to " + "determine whether slave %s should be " + "notified (%d)\n", qmt->qmt_svname, + obd_uuid2str(uuid), rc); + } + + OBD_ALLOC_PTR(work); + if (work == NULL) { + CERROR("%s: failed to notify %s\n", qmt->qmt_svname, + obd_uuid2str(uuid)); + continue; + } + + cfs_list_add_tail(&work->gl_list, &gl_list); + work->gl_lock = LDLM_LOCK_GET(lock); + work->gl_flags = 0; + work->gl_desc = desc; + + } + unlock_res(res); + + if (cfs_list_empty(&gl_list)) { + CDEBUG(D_QUOTA, "%s: nobody to notify\n", qmt->qmt_svname); + RETURN(0); + } + + /* issue glimpse callbacks to all connected slaves */ + rc = ldlm_glimpse_locks(res, &gl_list); + + cfs_list_for_each_safe(pos, tmp, &gl_list) { + struct ldlm_glimpse_work *work; + + work = cfs_list_entry(pos, struct ldlm_glimpse_work, gl_list); + + cfs_list_del(&work->gl_list); + CERROR("%s: failed to notify %s of new quota settings\n", + qmt->qmt_svname, + obd_uuid2str(&work->gl_lock->l_export->exp_client_uuid)); + LDLM_LOCK_RELEASE(work->gl_lock); + OBD_FREE_PTR(work); + } + + RETURN(rc); +} + +/* + * Send glimpse request to all global quota locks to push new quota setting to + * slaves. + * + * \param env - is the environment passed by the caller + * \param lqe - is the lquota entry which has new settings + * \param ver - is the version associated with the setting change + */ +void qmt_glb_lock_notify(const struct lu_env *env, struct lquota_entry *lqe, + __u64 ver) +{ + struct qmt_thread_info *qti = qmt_info(env); + struct qmt_pool_info *pool = lqe2qpi(lqe); + struct ldlm_resource *res = NULL; + int rc; + ENTRY; + + lquota_generate_fid(&qti->qti_fid, pool->qpi_key & 0x0000ffff, + pool->qpi_key >> 16, lqe->lqe_site->lqs_qtype); + + /* send glimpse callback to notify slaves of new quota settings */ + qti->qti_gl_desc.lquota_desc.gl_id = lqe->lqe_id; + qti->qti_gl_desc.lquota_desc.gl_flags = 0; + qti->qti_gl_desc.lquota_desc.gl_hardlimit = lqe->lqe_hardlimit; + qti->qti_gl_desc.lquota_desc.gl_softlimit = lqe->lqe_softlimit; + qti->qti_gl_desc.lquota_desc.gl_ver = ver; + + /* look up ldlm resource associated with global index */ + fid_build_reg_res_name(&qti->qti_fid, &qti->qti_resid); + res = ldlm_resource_get(pool->qpi_qmt->qmt_ns, NULL, &qti->qti_resid, + LDLM_PLAIN, 0); + if (res == NULL) { + /* this might happen if no slaves have enqueued global quota + * locks yet */ + LQUOTA_DEBUG(lqe, "failed to lookup ldlm resource associated " + "with "DFID, PFID(&qti->qti_fid)); + RETURN_EXIT; + } + + rc = qmt_glimpse_lock(env, pool->qpi_qmt, res, &qti->qti_gl_desc, + NULL, NULL); + ldlm_resource_putref(res); + EXIT; +} + +/* Callback function used to select locks that should be glimpsed when + * broadcasting the new qunit value */ +static int qmt_id_lock_cb(const struct lu_env *env, struct qmt_device *qmt, + struct obd_uuid *uuid, union ldlm_gl_desc *desc, + void *arg) +{ + struct obd_uuid *slv_uuid = arg; + + if (slv_uuid != NULL && obd_uuid_equals(uuid, slv_uuid)) + RETURN(0); + RETURN(+1); +} + +/* + * Send glimpse request on per-ID lock to push new qunit value to slave. + * + * \param env - is the environment passed by the caller + * \param qmt - is the quota master target device + * \param lqe - is the lquota entry with the new qunit value + * \param uuid - is the uuid of the slave acquiring space, if any + */ +static void qmt_id_lock_glimpse(const struct lu_env *env, + struct qmt_device *qmt, + struct lquota_entry *lqe, struct obd_uuid *uuid) +{ + struct qmt_thread_info *qti = qmt_info(env); + struct qmt_pool_info *pool = lqe2qpi(lqe); + struct ldlm_resource *res = NULL; + int rc; + ENTRY; + + if (!lqe->lqe_enforced) + RETURN_EXIT; + + lquota_generate_fid(&qti->qti_fid, pool->qpi_key & 0x0000ffff, + pool->qpi_key >> 16, lqe->lqe_site->lqs_qtype); + fid_build_quota_resid(&qti->qti_fid, &lqe->lqe_id, &qti->qti_resid); + res = ldlm_resource_get(qmt->qmt_ns, NULL, &qti->qti_resid, LDLM_PLAIN, + 0); + if (res == NULL) { + /* this might legitimately happens if slaves haven't had the + * opportunity to enqueue quota lock yet. */ + LQUOTA_DEBUG(lqe, "failed to lookup ldlm resource for per-ID " + "lock "DFID, PFID(&qti->qti_fid)); + RETURN_EXIT; + } + + lqe_read_lock(lqe); + /* The purpose of glimpse callback on per-ID lock is twofold: + * - notify slaves of new qunit value and hope they will release some + * spare quota space in return + * - notify slaves that master ran out of quota space and there is no + * need to send acquire request any more until further notice */ + + /* fill glimpse descriptor with lqe settings */ + if (lqe->lqe_edquot) + qti->qti_gl_desc.lquota_desc.gl_flags = LQUOTA_FL_EDQUOT; + else + qti->qti_gl_desc.lquota_desc.gl_flags = 0; + qti->qti_gl_desc.lquota_desc.gl_qunit = lqe->lqe_qunit; + lqe_read_unlock(lqe); + + /* The rebalance thread is the only thread which can issue glimpses */ + LASSERT(!lqe->lqe_gl); + lqe->lqe_gl = true; + + /* issue glimpse callback to slaves */ + rc = qmt_glimpse_lock(env, qmt, res, &qti->qti_gl_desc, + uuid ? qmt_id_lock_cb : NULL, (void *)uuid); + + LASSERT(lqe->lqe_gl); + lqe->lqe_gl = false; + + ldlm_resource_putref(res); + EXIT; +} + +/* + * Schedule a glimpse request on per-ID locks to push new qunit value or + * edquot flag to quota slaves. + * + * \param qmt - is the quota master target device + * \param lqe - is the lquota entry with the new qunit value + */ +void qmt_id_lock_notify(struct qmt_device *qmt, struct lquota_entry *lqe) +{ + bool added = false; + ENTRY; + + lqe_getref(lqe); + cfs_spin_lock(&qmt->qmt_reba_lock); + if (!qmt->qmt_stopping && cfs_list_empty(&lqe->lqe_link)) { + cfs_list_add_tail(&lqe->lqe_link, &qmt->qmt_reba_list); + added = true; + } + cfs_spin_unlock(&qmt->qmt_reba_lock); + + if (added) + cfs_waitq_signal(&qmt->qmt_reba_thread.t_ctl_waitq); + else + lqe_putref(lqe); + EXIT; +} + +/* + * The rebalance thread is in charge of sending glimpse callbacks on per-ID + * quota locks owned by slaves in order to notify them of: + * - a qunit shrink in which case slaves might release quota space back in + * glimpse reply. + * - set/clear edquot flag used to cache the "quota exhausted" state of the + * master. When the flag is set, slaves know that there is no need to + * try to acquire quota from the master since this latter has already + * distributed all the space. + */ +static int qmt_reba_thread(void *arg) +{ + struct qmt_device *qmt = (struct qmt_device *)arg; + struct ptlrpc_thread *thread = &qmt->qmt_reba_thread; + struct l_wait_info lwi = { 0 }; + struct lu_env *env; + struct lquota_entry *lqe, *tmp; + char pname[MTI_NAME_MAXLEN]; + int rc; + ENTRY; + + OBD_ALLOC_PTR(env); + if (env == NULL) + RETURN(-ENOMEM); + + rc = lu_env_init(env, LCT_MD_THREAD); + if (rc) { + CERROR("%s: failed to init env.", qmt->qmt_svname); + OBD_FREE_PTR(env); + RETURN(rc); + } + + snprintf(pname, MTI_NAME_MAXLEN, "qmt_reba_%s", qmt->qmt_svname); + cfs_daemonize(pname); + + thread_set_flags(thread, SVC_RUNNING); + cfs_waitq_signal(&thread->t_ctl_waitq); + + while (1) { + l_wait_event(thread->t_ctl_waitq, + !cfs_list_empty(&qmt->qmt_reba_list) || + !thread_is_running(thread), &lwi); + + cfs_spin_lock(&qmt->qmt_reba_lock); + cfs_list_for_each_entry_safe(lqe, tmp, &qmt->qmt_reba_list, + lqe_link) { + cfs_list_del_init(&lqe->lqe_link); + cfs_spin_unlock(&qmt->qmt_reba_lock); + + if (thread_is_running(thread)) + qmt_id_lock_glimpse(env, qmt, lqe, NULL); + + lqe_putref(lqe); + cfs_spin_lock(&qmt->qmt_reba_lock); + } + cfs_spin_unlock(&qmt->qmt_reba_lock); + + if (!thread_is_running(thread)) + break; + } + lu_env_fini(env); + OBD_FREE_PTR(env); + thread_set_flags(thread, SVC_STOPPED); + cfs_waitq_signal(&thread->t_ctl_waitq); + RETURN(rc); +} + +/* + * Start rebalance thread. Called when the QMT is being setup + */ +int qmt_start_reba_thread(struct qmt_device *qmt) +{ + struct ptlrpc_thread *thread = &qmt->qmt_reba_thread; + struct l_wait_info lwi = { 0 }; + int rc; + ENTRY; + + rc = cfs_create_thread(qmt_reba_thread, (void *)qmt, 0); + if (rc < 0) { + CERROR("%s: failed to start rebalance thread (%d)\n", + qmt->qmt_svname, rc); + thread_set_flags(thread, SVC_STOPPED); + RETURN(rc); + } + + l_wait_event(thread->t_ctl_waitq, + thread_is_running(thread) || thread_is_stopped(thread), + &lwi); + + RETURN(0); +} + +/* + * Stop rebalance thread. Called when the QMT is about to shutdown. + */ +void qmt_stop_reba_thread(struct qmt_device *qmt) +{ + struct ptlrpc_thread *thread = &qmt->qmt_reba_thread; + + if (!thread_is_stopped(thread)) { + struct l_wait_info lwi = { 0 }; + + thread_set_flags(thread, SVC_STOPPING); + cfs_waitq_signal(&thread->t_ctl_waitq); + + l_wait_event(thread->t_ctl_waitq, thread_is_stopped(thread), + &lwi); + } + LASSERT(cfs_list_empty(&qmt->qmt_reba_list)); } diff --git a/lustre/tests/Makefile.am b/lustre/tests/Makefile.am index 9eb7d6d..da3ba35 100644 --- a/lustre/tests/Makefile.am +++ b/lustre/tests/Makefile.am @@ -9,6 +9,7 @@ AM_CFLAGS = $(LLCFLAGS) DIST_SUBDIRS = mpi noinst_DATA = disk1_8.tar.bz2 +noinst_DATA += admin_quotafile_v2.usr admin_quotafile_v2.grp noinst_SCRIPTS = leak_finder.pl llmount.sh llmountcleanup.sh functions.sh noinst_SCRIPTS += test-framework.sh runvmstat runiozone runtests noinst_SCRIPTS += sanity.sh rundbench acceptance-small.sh compile.sh diff --git a/lustre/tests/sanity-quota.sh b/lustre/tests/sanity-quota.sh index c0912ae..b08e250 100644 --- a/lustre/tests/sanity-quota.sh +++ b/lustre/tests/sanity-quota.sh @@ -11,7 +11,7 @@ export PATH=$PWD/$SRCDIR:$SRCDIR:$PWD/$SRCDIR/../utils:$PATH:/sbin ONLY=${ONLY:-"$*"} # only accounting tests and test 22 are supported for the time being -ALWAYS_EXCEPT="0 1 2 3 4 5 6 7 8 9 10 11 12 13 15 17 18 19 20 21 23 24 27 30 36 $SANITY_QUOTA_EXCEPT" +ALWAYS_EXCEPT="0 1 2 3 4 5 6 7 8 9 10 11 12 13 15 17 18 19 20 21 23 24 27 30 $SANITY_QUOTA_EXCEPT" # UPDATE THE COMMENT ABOVE WITH BUG NUMBERS WHEN CHANGING ALWAYS_EXCEPT! [ "$ALWAYS_EXCEPT$EXCEPT" ] && @@ -1994,8 +1994,12 @@ test_36() { echo "Copy admin quota files into MDT0..." local mntpt=$(facet_mntpt $SINGLEMDS) local mdt0_fstype=$(facet_fstype $SINGLEMDS) - echo "$mdt0_node, $mdt0_dev, $mntpt" - do_node $mdt0_node mount -t $mdt0_fstype $MDS_MOUNT_OPTS $mdt0_dev $mntpt + local opt + if ! do_node $mdt0_node test -b $mdt0_fstype; then + opt="-o loop" + fi + echo "$mdt0_node, $mdt0_dev, $mntpt, $opt" + do_node $mdt0_node mount -t $mdt0_fstype $opt $mdt0_dev $mntpt do_node $mdt0_node mkdir $mntpt/OBJECTS do_node $mdt0_node cp $LUSTRE/tests/admin_quotafile_v2.usr $mntpt/OBJECTS do_node $mdt0_node cp $LUSTRE/tests/admin_quotafile_v2.grp $mntpt/OBJECTS @@ -2010,7 +2014,7 @@ test_36() { local proc="qmt.*.md-0x0.glb-usr" id_cnt=$(do_node $mdt0_node $LCTL get_param -n $proc | wc -l) - [ $id_cnt -eq 201 ] || error "Migrate inode user limit failed" + [ $id_cnt -eq 401 ] || error "Migrate inode user limit failed: $id_cnt" limit=$(getquota -u 1 global isoftlimit) [ $limit -eq 1024 ] || error "User inode softlimit: $limit" limit=$(getquota -u 1 global ihardlimit) @@ -2018,7 +2022,7 @@ test_36() { proc="qmt.*.md-0x0.glb-grp" id_cnt=$(do_node $mdt0_node $LCTL get_param -n $proc | wc -l) - [ $id_cnt -eq 201 ] || error "Migrate inode group limit failed" + [ $id_cnt -eq 401 ] || error "Migrate inode group limit failed: $id_cnt" limit=$(getquota -g 1 global isoftlimit) [ $limit -eq 1024 ] || error "Group inode softlimit: $limit" limit=$(getquota -g 1 global ihardlimit) @@ -2026,7 +2030,7 @@ test_36() { proc=" qmt.*.dt-0x0.glb-usr" id_cnt=$(do_node $mdt0_node $LCTL get_param -n $proc | wc -l) - [ $id_cnt -eq 201 ] || error "Migrate block user limit failed" + [ $id_cnt -eq 401 ] || error "Migrate block user limit failed: $id_cnt" limit=$(getquota -u 60001 global bsoftlimit) [ $limit -eq 10485760 ] || error "User block softlimit: $limit" limit=$(getquota -u 60001 global bhardlimit) @@ -2034,7 +2038,7 @@ test_36() { proc="qmt.*.dt-0x0.glb-grp" id_cnt=$(do_node $mdt0_node $LCTL get_param -n $proc | wc -l) - [ $id_cnt -eq 201 ] || error "Migrate block user limit failed" + [ $id_cnt -eq 401 ] || error "Migrate block user limit failed: $id_cnt" limit=$(getquota -g 60001 global bsoftlimit) [ $limit -eq 10485760 ] || error "Group block softlimit: $limit" limit=$(getquota -g 60001 global bhardlimit) -- 1.8.3.1