X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fquota%2Fqsd_lock.c;h=f5e24a3618869ff233a067a6f3cf8a2e2f9d4262;hb=4fed33473ca2964ff19f61fdb8501b2210f923de;hp=139ba4949bdb03b673871daa27ed5f416c7fed97;hpb=73ef5dcd8bcf32e4127e0011357b8bc39472cdba;p=fs%2Flustre-release.git diff --git a/lustre/quota/qsd_lock.c b/lustre/quota/qsd_lock.c index 139ba49..f5e24a3 100644 --- a/lustre/quota/qsd_lock.c +++ b/lustre/quota/qsd_lock.c @@ -21,7 +21,7 @@ * GPL HEADER END */ /* - * Copyright (c) 2012, 2013, Intel Corporation. + * Copyright (c) 2012, 2017, Intel Corporation. * Use is subject to license terms. * * Author: Johann Lombardi @@ -32,12 +32,13 @@ #include #include +#include #include "qsd_internal.h" typedef int (enqi_bl_cb_t)(struct ldlm_lock *lock, - struct ldlm_lock_desc *desc, void *data, - int flag); + struct ldlm_lock_desc *desc, void *data, + int flag); static enqi_bl_cb_t qsd_glb_blocking_ast, qsd_id_blocking_ast; typedef int (enqi_gl_cb_t)(struct ldlm_lock *lock, void *data); @@ -66,24 +67,26 @@ struct ldlm_enqueue_info qsd_id_einfo = { * \param reset - whether lock->l_ast_data should be cleared */ static struct qsd_qtype_info *qsd_glb_ast_data_get(struct ldlm_lock *lock, - bool reset) { + bool reset) +{ struct qsd_qtype_info *qqi; + ENTRY; lock_res_and_lock(lock); qqi = lock->l_ast_data; - if (qqi != NULL) { + if (qqi) { qqi_getref(qqi); if (reset) lock->l_ast_data = NULL; } unlock_res_and_lock(lock); - if (qqi != NULL) + if (qqi) /* it is not safe to call lu_ref_add() under spinlock */ lu_ref_add(&qqi->qqi_reference, "ast_data_get", lock); - if (reset && qqi != NULL) { + if (reset && qqi) { /* release qqi reference hold for the lock */ lu_ref_del(&qqi->qqi_reference, "glb_lock", lock); qqi_putref(qqi); @@ -99,20 +102,22 @@ static struct qsd_qtype_info *qsd_glb_ast_data_get(struct ldlm_lock *lock, * \param reset - whether lock->l_ast_data should be cleared */ static struct lquota_entry *qsd_id_ast_data_get(struct ldlm_lock *lock, - bool reset) { + bool reset) +{ struct lquota_entry *lqe; + ENTRY; lock_res_and_lock(lock); lqe = lock->l_ast_data; - if (lqe != NULL) { + if (lqe) { lqe_getref(lqe); if (reset) lock->l_ast_data = NULL; } unlock_res_and_lock(lock); - if (reset && lqe != NULL) + if (reset && lqe) /* release lqe reference hold for the lock */ lqe_putref(lqe); RETURN(lqe); @@ -136,18 +141,22 @@ static int qsd_common_glimpse_ast(struct ptlrpc_request *req, struct ldlm_gl_lquota_desc **desc, void **lvb) { int rc; + ENTRY; LASSERT(lustre_msg_get_opc(req->rq_reqmsg) == LDLM_GL_CALLBACK); /* glimpse on quota locks always packs a glimpse descriptor */ - req_capsule_extend(&req->rq_pill, &RQF_LDLM_GL_DESC_CALLBACK); + req_capsule_extend(&req->rq_pill, &RQF_LDLM_GL_CALLBACK_DESC); /* extract glimpse descriptor */ *desc = req_capsule_client_get(&req->rq_pill, &RMF_DLM_GL_DESC); - if (*desc == NULL) + if (!*desc) RETURN(-EFAULT); + if (ptlrpc_req_need_swab(req)) + lustre_swab_gl_lquota_desc(*desc); + /* prepare reply */ req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, sizeof(struct lquota_lvb)); @@ -178,9 +187,10 @@ static int qsd_glb_blocking_ast(struct ldlm_lock *lock, int flag) { int rc = 0; + ENTRY; - switch(flag) { + switch (flag) { case LDLM_CB_BLOCKING: { struct lustre_handle lockh; @@ -195,11 +205,13 @@ static int qsd_glb_blocking_ast(struct ldlm_lock *lock, LDLM_DEBUG(lock, "canceling global quota lock"); qqi = qsd_glb_ast_data_get(lock, true); - if (qqi == NULL) + if (!qqi) break; - /* we are losing the global index lock, so let's mark the - * global & slave indexes as not up-to-date any more */ + /* + * we are losing the global index lock, so let's mark the + * global & slave indexes as not up-to-date any more + */ write_lock(&qqi->qqi_qsd->qsd_lock); qqi->qqi_glb_uptodate = false; qqi->qqi_slv_uptodate = false; @@ -208,11 +220,13 @@ static int qsd_glb_blocking_ast(struct ldlm_lock *lock, write_unlock(&qqi->qqi_qsd->qsd_lock); CDEBUG(D_QUOTA, "%s: losing global index lock for %s type\n", - qqi->qqi_qsd->qsd_svname, QTYPE_NAME((qqi->qqi_qtype))); + qqi->qqi_qsd->qsd_svname, qtype_name((qqi->qqi_qtype))); - /* kick off reintegration thread if not running already, if + /* + * kick off reintegration thread if not running already, if * it's just local cancel (for stack clean up or eviction), - * don't re-trigger the reintegration. */ + * don't re-trigger the reintegration. + */ if (!ldlm_is_local_only(lock)) qsd_start_reint_thread(qqi); @@ -221,12 +235,55 @@ static int qsd_glb_blocking_ast(struct ldlm_lock *lock, break; } default: - LASSERTF(0, "invalid flags for blocking ast %d", flag); + LASSERTF(0, "invalid flags for blocking ast %d\n", flag); } RETURN(rc); } +static int qsd_entry_def_iter_cb(struct cfs_hash *hs, struct cfs_hash_bd *bd, + struct hlist_node *hnode, void *data) +{ + struct qsd_qtype_info *qqi = (struct qsd_qtype_info *)data; + struct lquota_entry *lqe; + + lqe = hlist_entry(hnode, struct lquota_entry, lqe_hash); + LASSERT(atomic_read(&lqe->lqe_ref) > 0); + + if (lqe->lqe_id.qid_uid == 0 || !lqe->lqe_is_default) + return 0; + + lqe_write_lock(lqe); + if (qqi->qqi_default_hardlimit == 0 && qqi->qqi_default_softlimit == 0) + lqe->lqe_enforced = false; + else + lqe->lqe_enforced = true; + lqe_write_unlock(lqe); + + return 0; +} + +/* Update the quota entries after receiving default quota update + * + * \param qqi - is the qsd_qtype_info associated with the quota entries + * \param hardlimit - new hardlimit of default quota + * \param softlimit - new softlimit of default quota + * \param gracetime - new gracetime of default quota + */ +void qsd_update_default_quota(struct qsd_qtype_info *qqi, __u64 hardlimit, + __u64 softlimit, __u64 gracetime) +{ + CDEBUG(D_QUOTA, "%s: update default quota setting from QMT.\n", + qqi->qqi_qsd->qsd_svname); + + qqi->qqi_default_hardlimit = hardlimit; + qqi->qqi_default_softlimit = softlimit; + qqi->qqi_default_gracetime = gracetime; + + cfs_hash_for_each_safe(qqi->qqi_site->lqs_hash, + qsd_entry_def_iter_cb, qqi); +} + /* * Glimpse callback handler for global quota lock. * @@ -235,12 +292,13 @@ static int qsd_glb_blocking_ast(struct ldlm_lock *lock, */ static int qsd_glb_glimpse_ast(struct ldlm_lock *lock, void *data) { - struct ptlrpc_request *req = data; - struct qsd_qtype_info *qqi; - struct ldlm_gl_lquota_desc *desc; - struct lquota_lvb *lvb; - struct lquota_glb_rec rec; - int rc; + struct ptlrpc_request *req = data; + struct qsd_qtype_info *qqi; + struct ldlm_gl_lquota_desc *desc; + struct lquota_lvb *lvb; + struct lquota_glb_rec rec; + int rc; + ENTRY; rc = qsd_common_glimpse_ast(req, &desc, (void **)&lvb); @@ -248,17 +306,18 @@ static int qsd_glb_glimpse_ast(struct ldlm_lock *lock, void *data) GOTO(out, rc); qqi = qsd_glb_ast_data_get(lock, false); - if (qqi == NULL) + if (!qqi) /* valid race */ GOTO(out, rc = -ELDLM_NO_LOCK_DATA); - CDEBUG(D_QUOTA, "%s: glimpse on glb quota locks, id:"LPU64" ver:"LPU64 - " hard:" LPU64" soft:"LPU64"\n", qqi->qqi_qsd->qsd_svname, + CDEBUG(D_QUOTA, + "%s: glimpse on glb quota locks, id:%llu ver:%llu hard:%llu soft:%llu\n", + qqi->qqi_qsd->qsd_svname, desc->gl_id.qid_uid, desc->gl_ver, desc->gl_hardlimit, desc->gl_softlimit); if (desc->gl_ver == 0) { - CERROR("%s: invalid global index version "LPU64"\n", + CERROR("%s: invalid global index version %llu\n", qqi->qqi_qsd->qsd_svname, desc->gl_ver); GOTO(out_qqi, rc = -EINVAL); } @@ -269,8 +328,14 @@ static int qsd_glb_glimpse_ast(struct ldlm_lock *lock, void *data) rec.qbr_time = desc->gl_time; rec.qbr_granted = 0; - /* We can't afford disk io in the context of glimpse callback handling - * thread, so the on-disk global limits update has to be deferred. */ + if (desc->gl_id.qid_uid == 0) + qsd_update_default_quota(qqi, desc->gl_hardlimit, + desc->gl_softlimit, desc->gl_time); + + /* + * We can't afford disk io in the context of glimpse callback handling + * thread, so the on-disk global limits update has to be deferred. + */ qsd_upd_schedule(qqi, NULL, &desc->gl_id, (union lquota_rec *)&rec, desc->gl_ver, true); EXIT; @@ -292,14 +357,16 @@ out: * \param flag - LDLM_CB_BLOCKING or LDLM_CB_CANCELING. Used to distinguish * cancellation and blocking ast's. */ -static int qsd_id_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, +static int qsd_id_blocking_ast(struct ldlm_lock *lock, + struct ldlm_lock_desc *desc, void *data, int flag) { - struct lustre_handle lockh; - int rc = 0; + struct lustre_handle lockh; + int rc = 0; + ENTRY; - switch(flag) { + switch (flag) { case LDLM_CB_BLOCKING: { LDLM_DEBUG(lock, "blocking AST on ID quota lock"); @@ -308,12 +375,12 @@ static int qsd_id_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *de break; } case LDLM_CB_CANCELING: { - struct lu_env *env; - struct lquota_entry *lqe; + struct lu_env *env; + struct lquota_entry *lqe; - LDLM_DEBUG(lock, "canceling global quota lock"); + LDLM_DEBUG(lock, "canceling ID quota lock"); lqe = qsd_id_ast_data_get(lock, true); - if (lqe == NULL) + if (!lqe) break; LQUOTA_DEBUG(lqe, "losing ID lock"); @@ -328,17 +395,21 @@ static int qsd_id_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *de } lqe_write_unlock(lqe); - /* If there is dqacq inflight, the release will be skipped + /* + * If there is dqacq inflight, the release will be skipped * at this time, and triggered on dqacq completion later, * which means there could be a short window that slave is - * holding spare grant wihtout per-ID lock. */ + * holding spare grant wihtout per-ID lock. + */ - /* don't release quota space for local cancel (stack clean - * up or eviction) */ + /* + * don't release quota space for local cancel (stack clean + * up or eviction) + */ if (!ldlm_is_local_only(lock)) { /* allocate environment */ OBD_ALLOC_PTR(env); - if (env == NULL) { + if (!env) { lqe_putref(lqe); rc = -ENOMEM; break; @@ -363,7 +434,7 @@ static int qsd_id_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *de break; } default: - LASSERTF(0, "invalid flags for blocking ast %d", flag); + LASSERTF(0, "invalid flags for blocking ast %d\n", flag); } RETURN(rc); @@ -377,13 +448,13 @@ static int qsd_id_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *de */ static int qsd_id_glimpse_ast(struct ldlm_lock *lock, void *data) { - struct ptlrpc_request *req = data; - struct lquota_entry *lqe; - struct qsd_instance *qsd; - struct ldlm_gl_lquota_desc *desc; - struct lquota_lvb *lvb; - int rc; - bool wakeup = false; + struct ptlrpc_request *req = data; + struct lquota_entry *lqe; + struct ldlm_gl_lquota_desc *desc; + struct lquota_lvb *lvb; + int rc; + bool wakeup = false; + ENTRY; rc = qsd_common_glimpse_ast(req, &desc, (void **)&lvb); @@ -391,15 +462,13 @@ static int qsd_id_glimpse_ast(struct ldlm_lock *lock, void *data) GOTO(out, rc); lqe = qsd_id_ast_data_get(lock, false); - if (lqe == NULL) + if (!lqe) /* valid race */ GOTO(out, rc = -ELDLM_NO_LOCK_DATA); - LQUOTA_DEBUG(lqe, "glimpse on quota locks, new qunit:"LPU64, + LQUOTA_DEBUG(lqe, "glimpse on quota locks, new qunit:%llu", desc->gl_qunit); - qsd = lqe2qqi(lqe)->qqi_qsd; - lqe_write_lock(lqe); lvb->lvb_id_rel = 0; if (desc->gl_qunit != 0 && desc->gl_qunit != lqe->lqe_qunit) { @@ -415,22 +484,24 @@ static int qsd_id_glimpse_ast(struct ldlm_lock *lock, void *data) if (space > 0) { if (lqe->lqe_pending_req > 0) { - LQUOTA_DEBUG(lqe, "request in flight, postpone " - "release of "LPD64, space); + LQUOTA_DEBUG(lqe, + "request in flight, postpone release of %lld", + space); lvb->lvb_id_may_rel = space; } else { lqe->lqe_pending_req++; /* release quota space in glimpse reply */ - LQUOTA_DEBUG(lqe, "releasing "LPD64, space); + LQUOTA_DEBUG(lqe, "releasing %lld", space); lqe->lqe_granted -= space; lvb->lvb_id_rel = space; lqe_write_unlock(lqe); /* change the lqe_granted */ - qsd_upd_schedule(lqe2qqi(lqe), lqe, &lqe->lqe_id, - (union lquota_rec *)&lqe->lqe_granted, - 0, false); + qsd_upd_schedule(lqe2qqi(lqe), lqe, + &lqe->lqe_id, + (union lquota_rec *) + &lqe->lqe_granted, 0, false); lqe_write_lock(lqe); lqe->lqe_pending_req--; @@ -461,8 +532,9 @@ out: */ int qsd_id_lock_match(struct lustre_handle *lockh, struct lustre_handle *rlockh) { - struct ldlm_lock *lock; - int rc; + struct ldlm_lock *lock; + int rc; + ENTRY; LASSERT(lockh); @@ -477,12 +549,14 @@ int qsd_id_lock_match(struct lustre_handle *lockh, struct lustre_handle *rlockh) LASSERT(lustre_handle_is_used(lockh)); ldlm_lock_dump_handle(D_QUOTA, lockh); - if (rlockh == NULL) + if (!rlockh) /* caller not interested in remote handle */ RETURN(0); - /* look up lock associated with local handle and extract remote handle - * to be packed in quota request */ + /* + * look up lock associated with local handle and extract remote handle + * to be packed in quota request + */ lock = ldlm_handle2lock(lockh); LASSERT(lock != NULL); lustre_handle_copy(rlockh, &lock->l_remote_handle); @@ -493,8 +567,9 @@ int qsd_id_lock_match(struct lustre_handle *lockh, struct lustre_handle *rlockh) int qsd_id_lock_cancel(const struct lu_env *env, struct lquota_entry *lqe) { - struct qsd_thread_info *qti = qsd_info(env); - int rc; + struct qsd_thread_info *qti = qsd_info(env); + int rc; + ENTRY; lqe_write_lock(lqe);