X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fquota%2Fqsd_lock.c;h=c63a4e330ae955b8a0da3e92aaafc812021baa48;hb=3a55e357715492cb195bfeb19ffa34d39f9c6926;hp=a21505d6a26b80acce04fea4e1aa358d2038bc68;hpb=9fb46705ae86aa2c0ac29427f0ff24f923560eb7;p=fs%2Flustre-release.git diff --git a/lustre/quota/qsd_lock.c b/lustre/quota/qsd_lock.c index a21505d..c63a4e3 100644 --- a/lustre/quota/qsd_lock.c +++ b/lustre/quota/qsd_lock.c @@ -21,24 +21,45 @@ * GPL HEADER END */ /* - * Copyright (c) 2011, 2012, Intel, Inc. + * Copyright (c) 2012, 2016, Intel Corporation. * Use is subject to license terms. * * Author: Johann Lombardi * Author: Niu Yawei */ -#ifndef EXPORT_SYMTAB -# define EXPORT_SYMTAB -#endif - #define DEBUG_SUBSYSTEM S_LQUOTA #include #include +#include #include "qsd_internal.h" +typedef int (enqi_bl_cb_t)(struct ldlm_lock *lock, + struct ldlm_lock_desc *desc, void *data, + int flag); +static enqi_bl_cb_t qsd_glb_blocking_ast, qsd_id_blocking_ast; + +typedef int (enqi_gl_cb_t)(struct ldlm_lock *lock, void *data); +static enqi_gl_cb_t qsd_glb_glimpse_ast, qsd_id_glimpse_ast; + +struct ldlm_enqueue_info qsd_glb_einfo = { + .ei_type = LDLM_PLAIN, + .ei_mode = LCK_CR, + .ei_cb_bl = qsd_glb_blocking_ast, + .ei_cb_cp = ldlm_completion_ast, + .ei_cb_gl = qsd_glb_glimpse_ast, +}; + +struct ldlm_enqueue_info qsd_id_einfo = { + .ei_type = LDLM_PLAIN, + .ei_mode = LCK_CR, + .ei_cb_bl = qsd_id_blocking_ast, + .ei_cb_cp = ldlm_completion_ast, + .ei_cb_gl = qsd_id_glimpse_ast, +}; + /* * Return qsd_qtype_info structure associated with a global lock * @@ -54,16 +75,19 @@ static struct qsd_qtype_info *qsd_glb_ast_data_get(struct ldlm_lock *lock, qqi = lock->l_ast_data; if (qqi != NULL) { qqi_getref(qqi); - lu_ref_add(&qqi->qqi_reference, "ast_data_get", lock); if (reset) lock->l_ast_data = NULL; } unlock_res_and_lock(lock); + if (qqi != NULL) + /* it is not safe to call lu_ref_add() under spinlock */ + lu_ref_add(&qqi->qqi_reference, "ast_data_get", lock); + if (reset && qqi != NULL) { /* release qqi reference hold for the lock */ - qqi_putref(qqi); lu_ref_del(&qqi->qqi_reference, "glb_lock", lock); + qqi_putref(qqi); } RETURN(qqi); } @@ -125,7 +149,12 @@ static int qsd_common_glimpse_ast(struct ptlrpc_request *req, if (*desc == NULL) RETURN(-EFAULT); + if (ptlrpc_req_need_swab(req)) + lustre_swab_gl_lquota_desc(*desc); + /* prepare reply */ + req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, + sizeof(struct lquota_lvb)); rc = req_capsule_server_pack(&req->rq_pill); if (rc != 0) { CERROR("Can't pack response, rc %d\n", rc); @@ -161,7 +190,7 @@ static int qsd_glb_blocking_ast(struct ldlm_lock *lock, LDLM_DEBUG(lock, "blocking AST on global quota lock"); ldlm_lock2handle(lock, &lockh); - rc = ldlm_cli_cancel(&lockh); + rc = ldlm_cli_cancel(&lockh, LCF_ASYNC); break; } case LDLM_CB_CANCELING: { @@ -183,12 +212,12 @@ static int qsd_glb_blocking_ast(struct ldlm_lock *lock, write_unlock(&qqi->qqi_qsd->qsd_lock); CDEBUG(D_QUOTA, "%s: losing global index lock for %s type\n", - qqi->qqi_qsd->qsd_svname, QTYPE_NAME((qqi->qqi_qtype))); + qqi->qqi_qsd->qsd_svname, qtype_name((qqi->qqi_qtype))); /* kick off reintegration thread if not running already, if * it's just local cancel (for stack clean up or eviction), * don't re-trigger the reintegration. */ - if ((lock->l_flags & LDLM_FL_LOCAL_ONLY) == 0) + if (!ldlm_is_local_only(lock)) qsd_start_reint_thread(qqi); lu_ref_del(&qqi->qqi_reference, "ast_data_get", lock); @@ -196,7 +225,7 @@ static int qsd_glb_blocking_ast(struct ldlm_lock *lock, break; } default: - LASSERTF(0, "invalid flags for blocking ast %d", flag); + LASSERTF(0, "invalid flags for blocking ast %d\n", flag); } RETURN(rc); @@ -227,13 +256,13 @@ static int qsd_glb_glimpse_ast(struct ldlm_lock *lock, void *data) /* valid race */ GOTO(out, rc = -ELDLM_NO_LOCK_DATA); - CDEBUG(D_QUOTA, "%s: glimpse on glb quota locks, id:"LPU64" ver:"LPU64 - " hard:" LPU64" soft:"LPU64"\n", qqi->qqi_qsd->qsd_svname, + CDEBUG(D_QUOTA, "%s: glimpse on glb quota locks, id:%llu ver:%llu" + " hard:" "%llu soft:%llu\n", qqi->qqi_qsd->qsd_svname, desc->gl_id.qid_uid, desc->gl_ver, desc->gl_hardlimit, desc->gl_softlimit); if (desc->gl_ver == 0) { - CERROR("%s: invalid global index version "LPU64"\n", + CERROR("%s: invalid global index version %llu\n", qqi->qqi_qsd->qsd_svname, desc->gl_ver); GOTO(out_qqi, rc = -EINVAL); } @@ -241,7 +270,7 @@ static int qsd_glb_glimpse_ast(struct ldlm_lock *lock, void *data) /* extract new hard & soft limits from the glimpse descriptor */ rec.qbr_hardlimit = desc->gl_hardlimit; rec.qbr_softlimit = desc->gl_softlimit; - rec.qbr_time = 0; + rec.qbr_time = desc->gl_time; rec.qbr_granted = 0; /* We can't afford disk io in the context of glimpse callback handling @@ -257,13 +286,7 @@ out: return rc; } -struct ldlm_enqueue_info qsd_glb_einfo = { LDLM_PLAIN, - LCK_CR, - qsd_glb_blocking_ast, - ldlm_completion_ast, - qsd_glb_glimpse_ast, - NULL, NULL }; -/* +/** * Blocking callback handler for per-ID lock * * \param lock - is the lock for which ast occurred. @@ -285,70 +308,66 @@ static int qsd_id_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *de LDLM_DEBUG(lock, "blocking AST on ID quota lock"); ldlm_lock2handle(lock, &lockh); - rc = ldlm_cli_cancel(&lockh); + rc = ldlm_cli_cancel(&lockh, LCF_ASYNC); break; } case LDLM_CB_CANCELING: { struct lu_env *env; struct lquota_entry *lqe; - bool rel = false; - LDLM_DEBUG(lock, "canceling global quota lock"); + LDLM_DEBUG(lock, "canceling ID quota lock"); lqe = qsd_id_ast_data_get(lock, true); if (lqe == NULL) break; LQUOTA_DEBUG(lqe, "losing ID lock"); - /* just local cancel (for stack clean up or eviction), don't - * release quota space in this case */ - if ((lock->l_flags & LDLM_FL_LOCAL_ONLY) != 0) { - lqe_putref(lqe); - break; - } - - /* allocate environment */ - OBD_ALLOC_PTR(env); - if (env == NULL) { - lqe_putref(lqe); - rc = -ENOMEM; - break; - } - - /* initialize environment */ - rc = lu_env_init(env, LCT_DT_THREAD); - if (rc) { - OBD_FREE_PTR(env); - lqe_putref(lqe); - break; - } - ldlm_lock2handle(lock, &lockh); lqe_write_lock(lqe); if (lustre_handle_equal(&lockh, &lqe->lqe_lockh)) { /* Clear lqe_lockh & reset qunit to 0 */ qsd_set_qunit(lqe, 0); memset(&lqe->lqe_lockh, 0, sizeof(lqe->lqe_lockh)); - lqe->lqe_edquot = false; - rel = true; + qsd_set_edquot(lqe, false); } lqe_write_unlock(lqe); - /* If there is qqacq inflight, the release will be skipped + /* If there is dqacq inflight, the release will be skipped * at this time, and triggered on dqacq completion later, * which means there could be a short window that slave is * holding spare grant wihtout per-ID lock. */ - if (rel) + + /* don't release quota space for local cancel (stack clean + * up or eviction) */ + if (!ldlm_is_local_only(lock)) { + /* allocate environment */ + OBD_ALLOC_PTR(env); + if (env == NULL) { + lqe_putref(lqe); + rc = -ENOMEM; + break; + } + + /* initialize environment */ + rc = lu_env_init(env, LCT_DT_THREAD); + if (rc) { + OBD_FREE_PTR(env); + lqe_putref(lqe); + break; + } + rc = qsd_adjust(env, lqe); + lu_env_fini(env); + OBD_FREE_PTR(env); + } + /* release lqe reference grabbed by qsd_id_ast_data_get() */ lqe_putref(lqe); - lu_env_fini(env); - OBD_FREE_PTR(env); break; } default: - LASSERTF(0, "invalid flags for blocking ast %d", flag); + LASSERTF(0, "invalid flags for blocking ast %d\n", flag); } RETURN(rc); @@ -364,7 +383,6 @@ static int qsd_id_glimpse_ast(struct ldlm_lock *lock, void *data) { struct ptlrpc_request *req = data; struct lquota_entry *lqe; - struct qsd_instance *qsd; struct ldlm_gl_lquota_desc *desc; struct lquota_lvb *lvb; int rc; @@ -380,11 +398,9 @@ static int qsd_id_glimpse_ast(struct ldlm_lock *lock, void *data) /* valid race */ GOTO(out, rc = -ELDLM_NO_LOCK_DATA); - LQUOTA_DEBUG(lqe, "glimpse on quota locks, new qunit:"LPU64, + LQUOTA_DEBUG(lqe, "glimpse on quota locks, new qunit:%llu", desc->gl_qunit); - qsd = lqe2qqi(lqe)->qqi_qsd; - lqe_write_lock(lqe); lvb->lvb_id_rel = 0; if (desc->gl_qunit != 0 && desc->gl_qunit != lqe->lqe_qunit) { @@ -401,13 +417,13 @@ static int qsd_id_glimpse_ast(struct ldlm_lock *lock, void *data) if (space > 0) { if (lqe->lqe_pending_req > 0) { LQUOTA_DEBUG(lqe, "request in flight, postpone " - "release of "LPD64, space); + "release of %lld", space); lvb->lvb_id_may_rel = space; } else { lqe->lqe_pending_req++; /* release quota space in glimpse reply */ - LQUOTA_DEBUG(lqe, "releasing "LPD64, space); + LQUOTA_DEBUG(lqe, "releasing %lld", space); lqe->lqe_granted -= space; lvb->lvb_id_rel = space; @@ -424,25 +440,18 @@ static int qsd_id_glimpse_ast(struct ldlm_lock *lock, void *data) } } - lqe->lqe_edquot = !!(desc->gl_flags & LQUOTA_FL_EDQUOT); + qsd_set_edquot(lqe, !!(desc->gl_flags & LQUOTA_FL_EDQUOT)); lqe_write_unlock(lqe); if (wakeup) - cfs_waitq_broadcast(&lqe->lqe_waiters); + wake_up_all(&lqe->lqe_waiters); lqe_putref(lqe); out: req->rq_status = rc; RETURN(rc); } -struct ldlm_enqueue_info qsd_id_einfo = { LDLM_PLAIN, - LCK_CR, - qsd_id_blocking_ast, - ldlm_completion_ast, - qsd_id_glimpse_ast, - NULL, NULL }; - -/* +/** * Check whether a slave already own a ldlm lock for the quota identifier \qid. * * \param lockh - is the local lock handle from lquota entry. @@ -470,6 +479,7 @@ int qsd_id_lock_match(struct lustre_handle *lockh, struct lustre_handle *rlockh) ldlm_lock_dump_handle(D_QUOTA, lockh); if (rlockh == NULL) + /* caller not interested in remote handle */ RETURN(0); /* look up lock associated with local handle and extract remote handle @@ -499,7 +509,7 @@ int qsd_id_lock_cancel(const struct lu_env *env, struct lquota_entry *lqe) if (lustre_handle_is_used(&qti->qti_lockh)) { memset(&lqe->lqe_lockh, 0, sizeof(lqe->lqe_lockh)); qsd_set_qunit(lqe, 0); - lqe->lqe_edquot = false; + qsd_set_edquot(lqe, false); } lqe_write_unlock(lqe);