4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; if not, write to the
18 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19 * Boston, MA 021110-1307, USA
24 * Copyright (c) 2012, 2017, Intel Corporation.
25 * Use is subject to license terms.
27 * Author: Johann Lombardi <johann.lombardi@intel.com>
28 * Author: Niu Yawei <yawei.niu@intel.com>
31 #define DEBUG_SUBSYSTEM S_LQUOTA
33 #include <lustre_dlm.h>
34 #include <obd_class.h>
35 #include <lustre_swab.h>
37 #include "qsd_internal.h"
39 typedef int (enqi_bl_cb_t)(struct ldlm_lock *lock,
40 struct ldlm_lock_desc *desc, void *data,
42 static enqi_bl_cb_t qsd_glb_blocking_ast, qsd_id_blocking_ast;
44 typedef int (enqi_gl_cb_t)(struct ldlm_lock *lock, void *data);
45 static enqi_gl_cb_t qsd_glb_glimpse_ast, qsd_id_glimpse_ast;
47 struct ldlm_enqueue_info qsd_glb_einfo = {
48 .ei_type = LDLM_PLAIN,
50 .ei_cb_bl = qsd_glb_blocking_ast,
51 .ei_cb_cp = ldlm_completion_ast,
52 .ei_cb_gl = qsd_glb_glimpse_ast,
55 struct ldlm_enqueue_info qsd_id_einfo = {
56 .ei_type = LDLM_PLAIN,
58 .ei_cb_bl = qsd_id_blocking_ast,
59 .ei_cb_cp = ldlm_completion_ast,
60 .ei_cb_gl = qsd_id_glimpse_ast,
64 * Return qsd_qtype_info structure associated with a global lock
66 * \param lock - is the global lock from which we should extract the qqi
67 * \param reset - whether lock->l_ast_data should be cleared
69 static struct qsd_qtype_info *qsd_glb_ast_data_get(struct ldlm_lock *lock,
72 struct qsd_qtype_info *qqi;
76 lock_res_and_lock(lock);
77 qqi = lock->l_ast_data;
81 lock->l_ast_data = NULL;
83 unlock_res_and_lock(lock);
86 /* it is not safe to call lu_ref_add() under spinlock */
87 lu_ref_add(&qqi->qqi_reference, "ast_data_get", lock);
90 /* release qqi reference hold for the lock */
91 lu_ref_del(&qqi->qqi_reference, "glb_lock", lock);
98 * Return lquota entry structure associated with a per-ID lock
100 * \param lock - is the per-ID lock from which we should extract the lquota
102 * \param reset - whether lock->l_ast_data should be cleared
104 static struct lquota_entry *qsd_id_ast_data_get(struct ldlm_lock *lock,
107 struct lquota_entry *lqe;
111 lock_res_and_lock(lock);
112 lqe = lock->l_ast_data;
116 lock->l_ast_data = NULL;
118 unlock_res_and_lock(lock);
121 /* release lqe reference hold for the lock */
127 * Glimpse callback handler for all quota locks. This function extracts
128 * information from the glimpse request.
130 * \param lock - is the lock targeted by the glimpse
131 * \param data - is a pointer to the glimpse ptlrpc request
132 * \param req - is the glimpse request
133 * \param desc - is the glimpse descriptor describing the purpose of the glimpse
135 * \param lvb - is the pointer to the lvb in the reply buffer
137 * \retval 0 on success and \desc, \lvb & \arg point to a valid structures,
138 * appropriate error on failure
140 static int qsd_common_glimpse_ast(struct ptlrpc_request *req,
141 struct ldlm_gl_lquota_desc **desc, void **lvb)
147 LASSERT(lustre_msg_get_opc(req->rq_reqmsg) == LDLM_GL_CALLBACK);
149 /* glimpse on quota locks always packs a glimpse descriptor */
150 req_capsule_extend(&req->rq_pill, &RQF_LDLM_GL_CALLBACK_DESC);
152 /* extract glimpse descriptor */
153 *desc = req_capsule_client_get(&req->rq_pill, &RMF_DLM_GL_DESC);
157 if (req_capsule_req_need_swab(&req->rq_pill))
158 lustre_swab_gl_lquota_desc(*desc);
161 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
162 sizeof(struct lquota_lvb));
163 rc = req_capsule_server_pack(&req->rq_pill);
165 CERROR("Can't pack response, rc %d\n", rc);
170 *lvb = req_capsule_server_get(&req->rq_pill, &RMF_DLM_LVB);
176 * Blocking callback handler for global index lock
178 * \param lock - is the lock for which ast occurred.
179 * \param desc - is the description of a conflicting lock in case of blocking
181 * \param data - is the value of lock->l_ast_data
182 * \param flag - LDLM_CB_BLOCKING or LDLM_CB_CANCELING. Used to distinguish
183 * cancellation and blocking ast's.
185 static int qsd_glb_blocking_ast(struct ldlm_lock *lock,
186 struct ldlm_lock_desc *desc, void *data,
194 case LDLM_CB_BLOCKING: {
195 struct lustre_handle lockh;
197 LDLM_DEBUG(lock, "blocking AST on global quota lock");
198 ldlm_lock2handle(lock, &lockh);
199 rc = ldlm_cli_cancel(&lockh, LCF_ASYNC);
202 case LDLM_CB_CANCELING: {
203 struct qsd_qtype_info *qqi;
205 LDLM_DEBUG(lock, "canceling global quota lock");
207 qqi = qsd_glb_ast_data_get(lock, true);
212 * we are losing the global index lock, so let's mark the
213 * global & slave indexes as not up-to-date any more
215 write_lock(&qqi->qqi_qsd->qsd_lock);
216 qqi->qqi_glb_uptodate = false;
217 qqi->qqi_slv_uptodate = false;
218 if (lock->l_handle.h_cookie == qqi->qqi_lockh.cookie)
219 memset(&qqi->qqi_lockh, 0, sizeof(qqi->qqi_lockh));
220 write_unlock(&qqi->qqi_qsd->qsd_lock);
222 CDEBUG(D_QUOTA, "%s: losing global index lock for %s type\n",
223 qqi->qqi_qsd->qsd_svname, qtype_name((qqi->qqi_qtype)));
226 * kick off reintegration thread if not running already, if
227 * it's just local cancel (for stack clean up or eviction),
228 * don't re-trigger the reintegration.
230 if (!ldlm_is_local_only(lock))
231 qsd_start_reint_thread(qqi);
233 lu_ref_del(&qqi->qqi_reference, "ast_data_get", lock);
238 LASSERTF(0, "invalid flags for blocking ast %d\n", flag);
244 static int qsd_entry_def_iter_cb(struct cfs_hash *hs, struct cfs_hash_bd *bd,
245 struct hlist_node *hnode, void *data)
247 struct qsd_qtype_info *qqi = (struct qsd_qtype_info *)data;
248 struct lquota_entry *lqe;
250 lqe = hlist_entry(hnode, struct lquota_entry, lqe_hash);
251 LASSERT(atomic_read(&lqe->lqe_ref) > 0);
253 if (lqe->lqe_id.qid_uid == 0 || !lqe->lqe_is_default)
257 if (qqi->qqi_default_hardlimit == 0 && qqi->qqi_default_softlimit == 0)
258 lqe->lqe_enforced = false;
260 lqe->lqe_enforced = true;
261 lqe_write_unlock(lqe);
266 /* Update the quota entries after receiving default quota update
268 * \param qqi - is the qsd_qtype_info associated with the quota entries
269 * \param hardlimit - new hardlimit of default quota
270 * \param softlimit - new softlimit of default quota
271 * \param gracetime - new gracetime of default quota
273 void qsd_update_default_quota(struct qsd_qtype_info *qqi, __u64 hardlimit,
274 __u64 softlimit, __u64 gracetime)
276 CDEBUG(D_QUOTA, "%s: update default quota setting from QMT.\n",
277 qqi->qqi_qsd->qsd_svname);
279 qqi->qqi_default_hardlimit = hardlimit;
280 qqi->qqi_default_softlimit = softlimit;
281 qqi->qqi_default_gracetime = gracetime;
283 cfs_hash_for_each_safe(qqi->qqi_site->lqs_hash,
284 qsd_entry_def_iter_cb, qqi);
288 * Glimpse callback handler for global quota lock.
290 * \param lock - is the lock targeted by the glimpse
291 * \param data - is a pointer to the glimpse ptlrpc request
293 static int qsd_glb_glimpse_ast(struct ldlm_lock *lock, void *data)
295 struct ptlrpc_request *req = data;
296 struct qsd_qtype_info *qqi;
297 struct ldlm_gl_lquota_desc *desc;
298 struct lquota_lvb *lvb;
299 struct lquota_glb_rec rec;
304 rc = qsd_common_glimpse_ast(req, &desc, (void **)&lvb);
308 qqi = qsd_glb_ast_data_get(lock, false);
311 GOTO(out, rc = -ELDLM_NO_LOCK_DATA);
314 "%s: glimpse on glb quota locks, id:%llu ver:%llu hard:%llu soft:%llu\n",
315 qqi->qqi_qsd->qsd_svname,
316 desc->gl_id.qid_uid, desc->gl_ver, desc->gl_hardlimit,
319 if (desc->gl_ver == 0) {
320 CERROR("%s: invalid global index version %llu\n",
321 qqi->qqi_qsd->qsd_svname, desc->gl_ver);
322 GOTO(out_qqi, rc = -EINVAL);
325 /* extract new hard & soft limits from the glimpse descriptor */
326 rec.qbr_hardlimit = desc->gl_hardlimit;
327 rec.qbr_softlimit = desc->gl_softlimit;
328 rec.qbr_time = desc->gl_time;
331 if (desc->gl_id.qid_uid == 0)
332 qsd_update_default_quota(qqi, desc->gl_hardlimit,
333 desc->gl_softlimit, desc->gl_time);
336 * We can't afford disk io in the context of glimpse callback handling
337 * thread, so the on-disk global limits update has to be deferred.
339 qsd_upd_schedule(qqi, NULL, &desc->gl_id, (union lquota_rec *)&rec,
343 lu_ref_del(&qqi->qqi_reference, "ast_data_get", lock);
351 * Blocking callback handler for per-ID lock
353 * \param lock - is the lock for which ast occurred.
354 * \param desc - is the description of a conflicting lock in case of blocking
356 * \param data - is the value of lock->l_ast_data
357 * \param flag - LDLM_CB_BLOCKING or LDLM_CB_CANCELING. Used to distinguish
358 * cancellation and blocking ast's.
360 static int qsd_id_blocking_ast(struct ldlm_lock *lock,
361 struct ldlm_lock_desc *desc,
362 void *data, int flag)
364 struct lustre_handle lockh;
370 case LDLM_CB_BLOCKING: {
372 LDLM_DEBUG(lock, "blocking AST on ID quota lock");
373 ldlm_lock2handle(lock, &lockh);
374 rc = ldlm_cli_cancel(&lockh, LCF_ASYNC);
377 case LDLM_CB_CANCELING: {
379 struct lquota_entry *lqe;
381 LDLM_DEBUG(lock, "canceling ID quota lock");
382 lqe = qsd_id_ast_data_get(lock, true);
386 LQUOTA_DEBUG(lqe, "losing ID lock");
388 ldlm_lock2handle(lock, &lockh);
390 if (lustre_handle_equal(&lockh, &lqe->lqe_lockh)) {
391 /* Clear lqe_lockh & reset qunit to 0 */
392 qsd_set_qunit(lqe, 0);
393 memset(&lqe->lqe_lockh, 0, sizeof(lqe->lqe_lockh));
394 qsd_set_edquot(lqe, false);
396 lqe_write_unlock(lqe);
399 * If there is dqacq inflight, the release will be skipped
400 * at this time, and triggered on dqacq completion later,
401 * which means there could be a short window that slave is
402 * holding spare grant wihtout per-ID lock.
406 * don't release quota space for local cancel (stack clean
409 if (!ldlm_is_local_only(lock)) {
410 /* allocate environment */
418 /* initialize environment */
419 rc = lu_env_init(env, LCT_DT_THREAD);
426 rc = qsd_adjust(env, lqe);
432 /* release lqe reference grabbed by qsd_id_ast_data_get() */
437 LASSERTF(0, "invalid flags for blocking ast %d\n", flag);
444 * Glimpse callback handler for per-ID quota locks.
446 * \param lock - is the lock targeted by the glimpse
447 * \param data - is a pointer to the glimpse ptlrpc request
449 static int qsd_id_glimpse_ast(struct ldlm_lock *lock, void *data)
451 struct ptlrpc_request *req = data;
452 struct lquota_entry *lqe;
453 struct ldlm_gl_lquota_desc *desc;
454 struct lquota_lvb *lvb;
460 rc = qsd_common_glimpse_ast(req, &desc, (void **)&lvb);
464 lqe = qsd_id_ast_data_get(lock, false);
467 GOTO(out, rc = -ELDLM_NO_LOCK_DATA);
469 LQUOTA_DEBUG(lqe, "glimpse on quota locks, new qunit:%llu, edquot:%d",
470 desc->gl_qunit, !!(desc->gl_flags & LQUOTA_FL_EDQUOT));
474 if (desc->gl_qunit != 0 && desc->gl_qunit != lqe->lqe_qunit) {
477 /* extract new qunit from glimpse request */
478 qsd_set_qunit(lqe, desc->gl_qunit);
480 space = lqe->lqe_granted - lqe->lqe_pending_rel;
481 space -= lqe->lqe_usage;
482 space -= lqe->lqe_pending_write + lqe->lqe_waiting_write;
483 space -= lqe->lqe_qunit;
486 if (lqe->lqe_pending_req > 0) {
488 "request in flight, postpone release of %lld",
490 lvb->lvb_id_may_rel = space;
492 lqe->lqe_pending_req++;
494 /* release quota space in glimpse reply */
495 LQUOTA_DEBUG(lqe, "releasing %lld", space);
496 lqe->lqe_granted -= space;
497 lvb->lvb_id_rel = space;
499 lqe_write_unlock(lqe);
500 /* change the lqe_granted */
501 qsd_upd_schedule(lqe2qqi(lqe), lqe,
504 &lqe->lqe_granted, 0, false);
507 lqe->lqe_pending_req--;
513 qsd_set_edquot(lqe, !!(desc->gl_flags & LQUOTA_FL_EDQUOT));
514 lqe_write_unlock(lqe);
517 wake_up(&lqe->lqe_waiters);
525 * Check whether a slave already own a ldlm lock for the quota identifier \qid.
527 * \param lockh - is the local lock handle from lquota entry.
528 * \param rlockh - is the remote lock handle of the matched lock, if any.
530 * \retval 0 : on successful look up and \lockh contains the lock handle.
531 * \retval -ENOENT: no lock found
533 int qsd_id_lock_match(struct lustre_handle *lockh, struct lustre_handle *rlockh)
535 struct ldlm_lock *lock;
542 if (!lustre_handle_is_used(lockh))
545 rc = ldlm_lock_addref_try(lockh, qsd_id_einfo.ei_mode);
549 LASSERT(lustre_handle_is_used(lockh));
550 ldlm_lock_dump_handle(D_QUOTA, lockh);
553 /* caller not interested in remote handle */
557 * look up lock associated with local handle and extract remote handle
558 * to be packed in quota request
560 lock = ldlm_handle2lock(lockh);
561 LASSERT(lock != NULL);
562 lustre_handle_copy(rlockh, &lock->l_remote_handle);
568 int qsd_id_lock_cancel(const struct lu_env *env, struct lquota_entry *lqe)
570 struct qsd_thread_info *qti = qsd_info(env);
576 if (lqe->lqe_pending_write || lqe->lqe_waiting_write ||
577 lqe->lqe_usage || lqe->lqe_granted) {
578 lqe_write_unlock(lqe);
582 lustre_handle_copy(&qti->qti_lockh, &lqe->lqe_lockh);
583 if (lustre_handle_is_used(&qti->qti_lockh)) {
584 memset(&lqe->lqe_lockh, 0, sizeof(lqe->lqe_lockh));
585 qsd_set_qunit(lqe, 0);
586 qsd_set_edquot(lqe, false);
588 lqe_write_unlock(lqe);
590 rc = qsd_id_lock_match(&qti->qti_lockh, NULL);
594 ldlm_lock_decref_and_cancel(&qti->qti_lockh, qsd_id_einfo.ei_mode);