4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; if not, write to the
18 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19 * Boston, MA 021110-1307, USA
24 * Copyright (c) 2012, 2017, Intel Corporation.
25 * Use is subject to license terms.
27 * Author: Johann Lombardi <johann.lombardi@intel.com>
28 * Author: Niu Yawei <yawei.niu@intel.com>
31 #define DEBUG_SUBSYSTEM S_LQUOTA
33 #include <linux/kthread.h>
35 #include <lustre_dlm.h>
36 #include <lustre_swab.h>
37 #include <obd_class.h>
39 #include "qmt_internal.h"
41 /* intent policy function called from mdt_intent_opc() when the intent is of
43 int qmt_intent_policy(const struct lu_env *env, struct lu_device *ld,
44 struct ptlrpc_request *req, struct ldlm_lock **lockp,
47 struct qmt_device *qmt = lu2qmt_dev(ld);
48 struct ldlm_intent *it;
49 struct quota_body *reqbody;
50 struct quota_body *repbody;
51 struct obd_uuid *uuid;
52 struct lquota_lvb *lvb;
53 struct ldlm_resource *res = (*lockp)->l_resource;
54 struct ldlm_reply *ldlm_rep;
58 req_capsule_extend(&req->rq_pill, &RQF_LDLM_INTENT_QUOTA);
59 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
60 ldlm_lvbo_size(*lockp));
62 /* extract quota body and intent opc */
63 it = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
65 RETURN(err_serious(-EFAULT));
67 reqbody = req_capsule_client_get(&req->rq_pill, &RMF_QUOTA_BODY);
69 RETURN(err_serious(-EFAULT));
72 rc = req_capsule_server_pack(&req->rq_pill);
74 CERROR("Can't pack response, rc %d\n", rc);
75 RETURN(err_serious(rc));
78 repbody = req_capsule_server_get(&req->rq_pill, &RMF_QUOTA_BODY);
80 RETURN(err_serious(-EFAULT));
82 ldlm_rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
84 RETURN(err_serious(-EFAULT));
86 uuid = &(*lockp)->l_export->exp_client_uuid;
89 case IT_QUOTA_DQACQ: {
90 struct lquota_entry *lqe;
91 struct ldlm_lock *lock;
94 if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] == 0)
95 /* acquire on global lock? something is wrong ... */
96 GOTO(out, rc = -EPROTO);
98 /* verify global lock isn't stale */
99 if (!lustre_handle_is_used(&reqbody->qb_glb_lockh))
100 GOTO(out, rc = -ENOLCK);
102 lock = ldlm_handle2lock(&reqbody->qb_glb_lockh);
104 GOTO(out, rc = -ENOLCK);
107 stype = qmt_uuid2idx(uuid, &idx);
109 GOTO(out, rc = -EINVAL);
111 /* TODO: it seems we don't need to get lqe from
112 * lq_lvb_data anymore ... And do extra get
114 lqe = res->lr_lvb_data;
115 LASSERT(lqe != NULL);
118 rc = qmt_pool_lqes_lookup(env, qmt, lqe_rtype(lqe), stype,
119 lqe_qtype(lqe), &reqbody->qb_id,
126 /* acquire quota space */
127 rc = qmt_dqacq0(env, qmt, uuid,
128 reqbody->qb_flags, reqbody->qb_count,
129 reqbody->qb_usage, repbody,
130 qmt_dom(lqe_rtype(lqe), stype) ? -1 : idx);
139 /* new connection from slave */
141 if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] != 0)
142 /* connection on per-ID lock? something is wrong ... */
143 GOTO(out, rc = -EPROTO);
145 rc = qmt_pool_new_conn(env, qmt, &reqbody->qb_fid,
146 &repbody->qb_slv_fid,
147 &repbody->qb_slv_ver, uuid);
153 CERROR("%s: invalid intent opcode: %llu\n", qmt->qmt_svname,
155 GOTO(out, rc = -EINVAL);
158 /* on success, pack lvb in reply */
159 lvb = req_capsule_server_get(&req->rq_pill, &RMF_DLM_LVB);
160 lvb_len = ldlm_lvbo_size(*lockp);
161 lvb_len = ldlm_lvbo_fill(*lockp, lvb, &lvb_len);
163 GOTO(out, rc = lvb_len);
165 req_capsule_shrink(&req->rq_pill, &RMF_DLM_LVB, lvb_len, RCL_SERVER);
167 ldlm_rep->lock_policy_res2 = clear_serious(rc);
173 * Initialize quota LVB associated with quota indexes.
174 * Called with res->lr_lvb_sem held
176 int qmt_lvbo_init(struct lu_device *ld, struct ldlm_resource *res)
179 struct qmt_thread_info *qti;
180 struct qmt_device *qmt = lu2qmt_dev(ld);
181 int pool_type, qtype;
185 LASSERT(res != NULL);
187 if (res->lr_type != LDLM_PLAIN)
190 if (res->lr_lvb_data ||
191 res->lr_name.name[LUSTRE_RES_ID_SEQ_OFF] != FID_SEQ_QUOTA_GLB)
198 /* extract global index FID and quota identifier */
199 fid_extract_from_quota_res(&qti->qti_fid, &qti->qti_id, &res->lr_name);
201 /* sanity check the global index FID */
202 rc = lquota_extract_fid(&qti->qti_fid, &pool_type, &qtype);
204 CERROR("can't extract glb index information from FID "DFID"\n",
205 PFID(&qti->qti_fid));
209 if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] != 0) {
210 /* no ID quota lock associated with UID/GID 0 or with a seq 0,
211 * we are thus dealing with an ID lock. */
212 struct qmt_pool_info *pool;
213 struct lquota_entry *lqe;
214 struct lqe_glbl_data *lgd;
216 pool = qmt_pool_lookup_glb(env, qmt, pool_type);
218 GOTO(out, rc = -ENOMEM);
220 /* Find the quota entry associated with the quota id */
221 lqe = qmt_pool_lqe_lookup(env, qmt, pool_type, qtype,
224 qpi_putref(env, pool);
225 GOTO(out, rc = PTR_ERR(lqe));
228 /* TODO: need something like qmt_extend_lqe_gd that has
229 * to be calledeach time when qpi_slv_nr is incremented */
230 lgd = qmt_alloc_lqe_gd(pool, qtype);
233 qpi_putref(env, pool);
234 GOTO(out, rc = -ENOMEM);
237 qmt_setup_lqe_gd(env, qmt, lqe, lgd, pool_type);
239 /* store reference to lqe in lr_lvb_data */
240 res->lr_lvb_data = lqe;
241 qpi_putref(env, pool);
242 LQUOTA_DEBUG(lqe, "initialized res lvb");
244 struct dt_object *obj;
246 /* lookup global index */
247 obj = dt_locate(env, qmt->qmt_child, &qti->qti_fid);
249 GOTO(out, rc = PTR_ERR(obj));
250 if (!dt_object_exists(obj)) {
251 dt_object_put(env, obj);
252 GOTO(out, rc = -ENOENT);
255 /* store reference to global index object in lr_lvb_data */
256 res->lr_lvb_data = obj;
257 CDEBUG(D_QUOTA, DFID" initialized lvb\n", PFID(&qti->qti_fid));
260 res->lr_lvb_len = sizeof(struct lquota_lvb);
266 /* clear lge_qunit/edquot_nu flags -
267 * slave recieved new qunit and edquot.
269 * \retval true if revoke is needed - qunit
270 * for this slave reaches least_qunit
272 static bool qmt_clear_lgeg_arr_nu(struct lquota_entry *lqe, int stype, int idx)
274 unsigned long least = lqe2qpi(lqe)->qpi_least_qunit;
277 /* There is no array to store lge for the case of DOM.
278 * Ignore it until MDT pools will be ready.
280 if (!qmt_dom(lqe_rtype(lqe), stype)) {
281 struct lqe_glbl_data *lgd;
283 mutex_lock(&lqe->lqe_glbl_data_lock);
284 lgd = lqe->lqe_glbl_data;
286 int lge_idx = qmt_map_lge_idx(lgd, idx);
288 lgd->lqeg_arr[lge_idx].lge_qunit_nu = 0;
289 lgd->lqeg_arr[lge_idx].lge_edquot_nu = 0;
290 /* We shouldn't call revoke for DOM case, it will be
291 * updated at qmt_id_lock_glimpse.
293 revoke = lgd->lqeg_arr[lge_idx].lge_qunit == least;
295 mutex_unlock(&lqe->lqe_glbl_data_lock);
301 static bool qmt_set_revoke(struct lu_env *env, struct lquota_entry *lqe_gl,
304 unsigned long least_qunit = lqe2qpi(lqe_gl)->qpi_least_qunit;
307 if (qmt_dom(lqe_rtype(lqe_gl), stype))
310 qti_lqes_write_lock(env);
311 mutex_lock(&lqe_gl->lqe_glbl_data_lock);
312 if (lqe_gl->lqe_glbl_data) {
313 struct lqe_glbl_data *lgd = lqe_gl->lqe_glbl_data;
316 lge_idx = qmt_map_lge_idx(lgd, idx);
317 if (lgd->lqeg_arr[lge_idx].lge_qunit == least_qunit) {
318 struct lquota_entry *lqe;
321 for (i = 0; i < qti_lqes_cnt(env); i++) {
322 lqe = qti_lqes(env)[i];
324 "lge_qunit %llu least_qunit %lu idx %d\n",
325 lgd->lqeg_arr[lge_idx].lge_qunit,
327 if (lqe->lqe_qunit == least_qunit) {
328 lqe->lqe_revoke_time =
330 notify |= qmt_adjust_edquot(lqe,
331 ktime_get_real_seconds());
336 mutex_unlock(&lqe_gl->lqe_glbl_data_lock);
337 qti_lqes_write_unlock(env);
343 * Update LVB associated with the global quota index.
344 * This function is called from the DLM itself after a glimpse callback, in this
345 * case valid ptlrpc request is passed.
347 int qmt_lvbo_update(struct lu_device *ld, struct ldlm_resource *res,
348 struct ptlrpc_request *req, int increase_only)
351 struct qmt_thread_info *qti;
352 struct qmt_device *qmt = lu2qmt_dev(ld);
353 struct lquota_entry *lqe;
354 struct lquota_lvb *lvb;
355 struct ldlm_lock *lock;
356 struct obd_export *exp;
358 int rc = 0, idx, stype;
361 LASSERT(res != NULL);
366 if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] == 0)
367 /* no need to update lvb for global quota locks */
370 lvb = req_capsule_server_swab_get(&req->rq_pill, &RMF_DLM_LVB,
371 lustre_swab_lquota_lvb);
373 CERROR("%s: failed to extract lvb from request\n",
378 lqe = res->lr_lvb_data;
379 LASSERT(lqe != NULL);
382 /* allocate environement */
387 /* The request is a glimpse callback which was sent via the
388 * reverse import to the slave. What we care about here is the
389 * export associated with the slave and req->rq_export is
390 * definitely not what we are looking for (it is actually set to
392 * Therefore we extract the lock from the request argument
393 * and use lock->l_export. */
394 lock = ldlm_request_lock(req);
396 CERROR("%s: failed to get lock from request!\n",
398 GOTO(out, rc = PTR_ERR(lock));
401 exp = class_export_get(lock->l_export);
403 CERROR("%s: failed to get export from lock!\n",
405 GOTO(out, rc = -EFAULT);
408 stype = qmt_uuid2idx(&exp->exp_client_uuid, &idx);
410 GOTO(out_exp, rc = stype);
412 need_revoke = qmt_clear_lgeg_arr_nu(lqe, stype, idx);
413 if (lvb->lvb_id_rel == 0) {
414 /* nothing to release */
415 if (lvb->lvb_id_may_rel != 0)
416 /* but might still release later ... */
417 lqe->lqe_may_rel += lvb->lvb_id_may_rel;
420 if (!need_revoke && lvb->lvb_id_rel == 0)
421 GOTO(out_exp, rc = 0);
423 rc = qmt_pool_lqes_lookup(env, qmt, lqe_rtype(lqe), stype,
424 lqe_qtype(lqe), &lqe->lqe_id, NULL, idx);
428 if (need_revoke && qmt_set_revoke(env, lqe, stype, idx)) {
431 mutex_lock(&lqe->lqe_glbl_data_lock);
432 if (lqe->lqe_glbl_data) {
433 qmt_seed_glbe_edquot(env, lqe->lqe_glbl_data);
436 mutex_unlock(&lqe->lqe_glbl_data_lock);
438 qmt_id_lock_notify(qmt, lqe);
441 if (lvb->lvb_id_rel) {
442 LQUOTA_DEBUG(lqe, "releasing:%llu may release:%llu",
443 lvb->lvb_id_rel, lvb->lvb_id_may_rel);
445 /* release quota space */
446 rc = qmt_dqacq0(env, qmt, &exp->exp_client_uuid,
447 QUOTA_DQACQ_FL_REL, lvb->lvb_id_rel,
449 qmt_dom(lqe_rtype(lqe), stype) ? -1 : idx);
450 if (rc || qti->qti_body.qb_count != lvb->lvb_id_rel)
452 "failed to release quota space on glimpse %llu!=%llu : rc = %d\n",
453 qti->qti_body.qb_count,
454 lvb->lvb_id_rel, rc);
461 class_export_put(exp);
468 * Report size of lvb to ldlm layer in order to allocate lvb buffer
469 * As far as quota locks are concerned, the size is static and is the same
470 * for both global and per-ID locks which shares the same lvb format.
472 int qmt_lvbo_size(struct lu_device *ld, struct ldlm_lock *lock)
474 return sizeof(struct lquota_lvb);
478 * Fill request buffer with quota lvb
480 int qmt_lvbo_fill(struct lu_device *ld, struct ldlm_lock *lock, void *lvb,
483 struct ldlm_resource *res = lock->l_resource;
484 struct lquota_lvb *qlvb = lvb;
489 LASSERT(res != NULL);
492 if (res->lr_type != LDLM_PLAIN || res->lr_lvb_data == NULL ||
493 res->lr_name.name[LUSTRE_RES_ID_SEQ_OFF] != FID_SEQ_QUOTA_GLB)
499 if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] != 0) {
500 /* no ID quota lock associated with UID/GID 0 or with a seq 0,
501 * we are thus dealing with an ID lock. */
502 struct lquota_entry *lqe = res->lr_lvb_data;
503 struct qmt_device *qmt;
504 struct obd_uuid *uuid;
507 uuid = &(lock)->l_export->exp_client_uuid;
508 rc = qmt_uuid2idx(uuid, &idx);
511 qmt = lu2qmt_dev(ld);
512 /* return current qunit value & edquot flags in lvb */
514 rc = qmt_pool_lqes_lookup(env, qmt, lqe_rtype(lqe), rc,
515 lqe_qtype(lqe), &lqe->lqe_id,
518 qlvb->lvb_id_qunit = qti_lqes_min_qunit(env);
520 if (qti_lqes_edquot(env))
521 qlvb->lvb_flags = LQUOTA_FL_EDQUOT;
524 CDEBUG(D_QUOTA, "uuid %s lqe_id %lu, edquot %llu qunit %llu\n",
525 (char *)uuid, (unsigned long)lqe->lqe_id.qid_uid,
526 qlvb->lvb_flags, qlvb->lvb_id_qunit);
529 /* global quota lock */
530 struct dt_object *obj = res->lr_lvb_data;
532 /* return current version of global index */
533 qlvb->lvb_glb_ver = dt_version_get(env, obj);
536 RETURN(rc = rc ?: sizeof(struct lquota_lvb));
540 * Free lvb associated with a given ldlm resource
541 * we don't really allocate a lvb, lr_lvb_data just points to
542 * the appropriate backend structures.
544 int qmt_lvbo_free(struct lu_device *ld, struct ldlm_resource *res)
548 if (res->lr_lvb_data == NULL)
551 if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] != 0) {
552 struct lquota_entry *lqe = res->lr_lvb_data;
553 struct lqe_glbl_data *lgd;
555 mutex_lock(&lqe->lqe_glbl_data_lock);
556 lgd = lqe->lqe_glbl_data;
557 lqe->lqe_glbl_data = NULL;
558 mutex_unlock(&lqe->lqe_glbl_data_lock);
559 qmt_free_lqe_gd(lgd);
561 /* release lqe reference */
564 struct dt_object *obj = res->lr_lvb_data;
565 /* release object reference */
566 dt_object_put(lu_env_find(), obj);
569 res->lr_lvb_data = NULL;
575 typedef int (*qmt_glimpse_cb_t)(struct ldlm_lock *, struct lquota_entry *);
577 struct qmt_gl_lock_array {
580 struct ldlm_lock **q_locks;
583 static void qmt_free_lock_array(struct qmt_gl_lock_array *array)
587 if (array->q_max == 0) {
588 LASSERT(array->q_locks == NULL);
592 for (i = 0; i < array->q_cnt; i++) {
593 LASSERT(array->q_locks[i]);
594 LDLM_LOCK_RELEASE(array->q_locks[i]);
595 array->q_locks[i] = NULL;
598 OBD_FREE_PTR_ARRAY(array->q_locks, array->q_max);
599 array->q_locks = NULL;
603 static int qmt_alloc_lock_array(struct ldlm_resource *res,
604 struct qmt_gl_lock_array *array,
605 qmt_glimpse_cb_t cb, void *arg)
607 struct lquota_entry *lqe = arg;
608 struct list_head *pos;
609 unsigned long count = 0;
613 LASSERT(!array->q_max && !array->q_cnt && !array->q_locks);
616 mutex_lock(&lqe->lqe_glbl_data_lock);
618 /* scan list of granted locks */
619 list_for_each(pos, &res->lr_granted) {
620 struct ldlm_lock *lock;
623 lock = list_entry(pos, struct ldlm_lock, l_res_link);
624 LASSERT(lock->l_export);
628 /* slave should not be notified */
634 if (array->q_max != 0 && array->q_cnt < array->q_max) {
635 array->q_locks[array->q_cnt] = LDLM_LOCK_GET(lock);
641 mutex_unlock(&lqe->lqe_glbl_data_lock);
643 if (count > array->q_max) {
644 qmt_free_lock_array(array);
648 * allocate more slots in case of more qualified locks are
649 * found during next loop
651 array->q_max = count + count / 2 + 10;
653 LASSERT(array->q_locks == NULL && array->q_cnt == 0);
654 OBD_ALLOC_PTR_ARRAY(array->q_locks, array->q_max);
655 if (array->q_locks == NULL) {
665 static void qmt_setup_id_desc(struct ldlm_lock *lock, union ldlm_gl_desc *desc,
666 struct lquota_entry *lqe)
668 struct obd_uuid *uuid = &(lock)->l_export->exp_client_uuid;
673 stype = qmt_uuid2idx(uuid, &idx);
676 /* DOM case - set global lqe settings */
677 if (qmt_dom(lqe_rtype(lqe), stype)) {
678 edquot = lqe->lqe_edquot;
679 qunit = lqe->lqe_qunit;
681 struct lqe_glbl_data *lgd;
684 mutex_lock(&lqe->lqe_glbl_data_lock);
685 lgd = lqe->lqe_glbl_data;
687 lge_idx = qmt_map_lge_idx(lgd, idx);
688 edquot = lgd->lqeg_arr[lge_idx].lge_edquot;
689 qunit = lgd->lqeg_arr[lge_idx].lge_qunit;
691 edquot = lqe->lqe_edquot;
692 qunit = lqe->lqe_qunit;
694 mutex_unlock(&lqe->lqe_glbl_data_lock);
697 /* fill glimpse descriptor with lqe settings */
698 desc->lquota_desc.gl_flags = edquot ? LQUOTA_FL_EDQUOT : 0;
699 desc->lquota_desc.gl_qunit = qunit;
700 CDEBUG(D_QUOTA, "setup desc: stype %d idx %d, edquot %llu qunit %llu\n",
701 stype, idx, desc->lquota_desc.gl_flags,
702 desc->lquota_desc.gl_qunit);
706 * Send glimpse callback to slaves holding a lock on resource \res.
707 * This is used to notify slaves of new quota settings or to claim quota space
710 * \param env - is the environment passed by the caller
711 * \param qmt - is the quota master target
712 * \param res - is the dlm resource associated with the quota object
713 * \param desc - is the glimpse descriptor to pack in glimpse callback
714 * \param cb - is the callback function called on every lock and determine
715 * whether a glimpse should be issued
716 * \param arg - is an opaq parameter passed to the callback function
718 static int qmt_glimpse_lock(const struct lu_env *env, struct qmt_device *qmt,
719 struct ldlm_resource *res, union ldlm_gl_desc *desc,
720 qmt_glimpse_cb_t cb, struct lquota_entry *lqe)
722 union ldlm_gl_desc *descs = NULL;
723 struct list_head *tmp, *pos;
725 struct qmt_gl_lock_array locks;
726 unsigned long i, locks_count;
730 memset(&locks, 0, sizeof(locks));
731 rc = qmt_alloc_lock_array(res, &locks, cb, lqe);
733 CERROR("%s: failed to allocate glimpse lock array (%d)\n",
734 qmt->qmt_svname, rc);
738 CDEBUG(D_QUOTA, "%s: no granted locks to send glimpse\n",
742 CDEBUG(D_QUOTA, "found granted locks %lu\n", locks.q_cnt);
743 locks_count = locks.q_cnt;
745 /* Use one desc for all works, when called from qmt_glb_lock_notify */
746 if (cb && locks.q_cnt > 1) {
747 /* TODO: think about to store this preallocated descs
748 * in lqe_global in lqeg_arr as a part of lqe_glbl_entry.
749 * The benefit is that we don't need to allocate/free
750 * and setup this descs each time. But the drawback is
751 * memory use (sizeof ldlm_gl_desc * OST_COUNT * user_number).
752 * for examfple it could be 88 * 256 * 10 000 about 225 MB. */
754 sizeof(struct ldlm_gl_lquota_desc) * locks.q_cnt);
756 CERROR("%s: alloc glimpse lock array failed: rc = %d\n",
757 qmt->qmt_svname, rc);
758 qmt_free_lock_array(&locks);
763 for (i = locks.q_cnt; i > 0; i--) {
764 struct ldlm_glimpse_work *work;
768 CERROR("%s: failed to notify a lock.\n",
775 desc = &descs[i - 1];
776 qmt_setup_id_desc(locks.q_locks[i - 1], desc, lqe);
777 work->gl_interpret_data = lqe;
780 list_add_tail(&work->gl_list, &gl_list);
781 work->gl_lock = locks.q_locks[i - 1];
783 work->gl_desc = desc;
785 locks.q_locks[i - 1] = NULL;
789 qmt_free_lock_array(&locks);
791 if (list_empty(&gl_list)) {
792 CDEBUG(D_QUOTA, "%s: nobody to notify\n", qmt->qmt_svname);
796 /* issue glimpse callbacks to all connected slaves */
797 rc = ldlm_glimpse_locks(res, &gl_list);
799 list_for_each_safe(pos, tmp, &gl_list) {
800 struct ldlm_glimpse_work *work;
802 work = list_entry(pos, struct ldlm_glimpse_work, gl_list);
804 list_del(&work->gl_list);
805 CERROR("%s: failed to notify %s of new quota settings\n",
807 obd_uuid2str(&work->gl_lock->l_export->exp_client_uuid));
808 LDLM_LOCK_RELEASE(work->gl_lock);
814 sizeof(struct ldlm_gl_lquota_desc) * locks_count);
820 * Send glimpse request to all global quota locks to push new quota setting to
823 * \param env - is the environment passed by the caller
824 * \param lqe - is the lquota entry which has new settings
825 * \param ver - is the version associated with the setting change
827 void qmt_glb_lock_notify(const struct lu_env *env, struct lquota_entry *lqe,
830 struct qmt_thread_info *qti = qmt_info(env);
831 struct qmt_pool_info *pool = lqe2qpi(lqe);
832 struct ldlm_resource *res = NULL;
835 lquota_generate_fid(&qti->qti_fid, pool->qpi_rtype, lqe_qtype(lqe));
837 /* send glimpse callback to notify slaves of new quota settings */
838 qti->qti_gl_desc.lquota_desc.gl_id = lqe->lqe_id;
839 qti->qti_gl_desc.lquota_desc.gl_flags = 0;
840 if (lqe->lqe_is_default) {
841 qti->qti_gl_desc.lquota_desc.gl_hardlimit = 0;
842 qti->qti_gl_desc.lquota_desc.gl_softlimit = 0;
843 qti->qti_gl_desc.lquota_desc.gl_time = LQUOTA_GRACE_FLAG(0,
844 LQUOTA_FLAG_DEFAULT);
846 } else if (lqe->lqe_is_deleted) {
847 qti->qti_gl_desc.lquota_desc.gl_hardlimit = 0;
848 qti->qti_gl_desc.lquota_desc.gl_softlimit = 0;
849 qti->qti_gl_desc.lquota_desc.gl_time = LQUOTA_GRACE_FLAG(0,
850 LQUOTA_FLAG_DELETED);
851 } else if (lqe->lqe_is_reset) {
852 qti->qti_gl_desc.lquota_desc.gl_hardlimit = lqe->lqe_hardlimit;
853 qti->qti_gl_desc.lquota_desc.gl_softlimit = lqe->lqe_softlimit;
854 qti->qti_gl_desc.lquota_desc.gl_time = LQUOTA_GRACE_FLAG(0,
856 } else if (lqe->lqe_granted > lqe->lqe_hardlimit) {
857 qti->qti_gl_desc.lquota_desc.gl_hardlimit = lqe->lqe_hardlimit;
858 qti->qti_gl_desc.lquota_desc.gl_softlimit = lqe->lqe_softlimit;
859 qti->qti_gl_desc.lquota_desc.gl_time = LQUOTA_GRACE_FLAG(0,
862 qti->qti_gl_desc.lquota_desc.gl_hardlimit = lqe->lqe_hardlimit;
863 qti->qti_gl_desc.lquota_desc.gl_softlimit = lqe->lqe_softlimit;
864 qti->qti_gl_desc.lquota_desc.gl_time = lqe->lqe_gracetime;
866 qti->qti_gl_desc.lquota_desc.gl_ver = ver;
868 /* look up ldlm resource associated with global index */
869 fid_build_reg_res_name(&qti->qti_fid, &qti->qti_resid);
870 res = ldlm_resource_get(pool->qpi_qmt->qmt_ns, &qti->qti_resid,
873 /* this might happen if no slaves have enqueued global quota
875 LQUOTA_DEBUG(lqe, "failed to lookup ldlm resource associated "
876 "with "DFID, PFID(&qti->qti_fid));
880 qmt_glimpse_lock(env, pool->qpi_qmt, res, &qti->qti_gl_desc,
882 ldlm_resource_putref(res);
886 /* Callback function used to select locks that should be glimpsed when
887 * broadcasting the new qunit value */
888 static int qmt_id_lock_cb(struct ldlm_lock *lock, struct lquota_entry *lqe)
890 struct obd_uuid *uuid = &(lock)->l_export->exp_client_uuid;
891 struct lqe_glbl_data *lgd = lqe->lqe_glbl_data;
893 int stype = qmt_uuid2idx(uuid, &idx);
895 LASSERT(stype == QMT_STYPE_OST || stype == QMT_STYPE_MDT);
897 CDEBUG(D_QUOTA, "stype %d rtype %d idx %d uuid %s\n",
898 stype, lqe_rtype(lqe), idx, uuid->uuid);
899 /* Quota pools support only OSTs, despite MDTs also could be registered
900 * as LQUOTA_RES_DT devices(DOM). */
901 if (qmt_dom(lqe_rtype(lqe), stype))
905 int lge_idx = qmt_map_lge_idx(lgd, idx);
908 "tgt idx:%d lge_idx:%d edquot_nu:%d qunit_nu:%d\n",
909 idx, lge_idx, lgd->lqeg_arr[lge_idx].lge_edquot_nu,
910 lgd->lqeg_arr[lge_idx].lge_qunit_nu);
911 return lgd->lqeg_arr[lge_idx].lge_edquot_nu ||
912 lgd->lqeg_arr[lge_idx].lge_qunit_nu;
920 * Send glimpse request on per-ID lock to push new qunit value to slave.
922 * \param env - is the environment passed by the caller
923 * \param qmt - is the quota master target device
924 * \param lqe - is the lquota entry with the new qunit value
925 * \param uuid - is the uuid of the slave acquiring space, if any
927 static void qmt_id_lock_glimpse(const struct lu_env *env,
928 struct qmt_device *qmt,
929 struct lquota_entry *lqe, struct obd_uuid *uuid)
931 struct qmt_thread_info *qti = qmt_info(env);
932 struct qmt_pool_info *pool = lqe2qpi(lqe);
933 struct ldlm_resource *res = NULL;
936 if (!lqe->lqe_enforced)
939 lquota_generate_fid(&qti->qti_fid, pool->qpi_rtype, lqe_qtype(lqe));
940 fid_build_quota_res_name(&qti->qti_fid, &lqe->lqe_id, &qti->qti_resid);
941 res = ldlm_resource_get(qmt->qmt_ns, &qti->qti_resid, LDLM_PLAIN, 0);
943 /* this might legitimately happens if slaves haven't had the
944 * opportunity to enqueue quota lock yet. */
945 LQUOTA_DEBUG(lqe, "failed to lookup ldlm resource for per-ID "
946 "lock "DFID, PFID(&qti->qti_fid));
948 if (lqe->lqe_revoke_time == 0 &&
949 lqe->lqe_qunit == pool->qpi_least_qunit)
950 lqe->lqe_revoke_time = ktime_get_seconds();
951 lqe_write_unlock(lqe);
957 * It is possible to add an lqe in a 2nd time while the same lqe
958 * from the 1st time is still sending glimpse
962 /* The purpose of glimpse callback on per-ID lock is twofold:
963 * - notify slaves of new qunit value and hope they will release some
964 * spare quota space in return
965 * - notify slaves that master ran out of quota space and there is no
966 * need to send acquire request any more until further notice */
968 /* TODO: it is not clear how to implement below case for all lqes
969 * from where slaves will be notified in qmt_glimpse_lock. Because
970 * here we have just global lqe with an array of OSTs that should
971 * be notified. Theoretically we can find all lqes that includes
972 * these OSTs, but it is not trivial. So I would propose to move
973 * this case to another place ... */
974 if (lqe->lqe_revoke_time == 0 &&
975 lqe->lqe_qunit == pool->qpi_least_qunit)
976 /* reset lqe_may_rel, it will be updated on glimpse callback
977 * replies if needed */
978 lqe->lqe_may_rel = 0;
981 lqe_write_unlock(lqe);
983 /* issue glimpse callback to slaves */
984 if (lqe->lqe_glbl_data)
985 qmt_glimpse_lock(env, qmt, res, &qti->qti_gl_desc,
986 qmt_id_lock_cb, lqe);
989 if (lqe->lqe_revoke_time == 0 &&
990 lqe->lqe_qunit == pool->qpi_least_qunit) {
991 lqe->lqe_revoke_time = ktime_get_seconds();
992 qmt_adjust_edquot(lqe, ktime_get_real_seconds());
994 LASSERT(lqe->lqe_gl);
997 lqe_write_unlock(lqe);
998 ldlm_resource_putref(res);
1003 * Schedule a glimpse request on per-ID locks to push new qunit value or
1004 * edquot flag to quota slaves.
1006 * \param qmt - is the quota master target device
1007 * \param lqe - is the lquota entry with the new qunit value
1009 void qmt_id_lock_notify(struct qmt_device *qmt, struct lquota_entry *lqe)
1014 LASSERT(lqe->lqe_is_global);
1016 spin_lock(&qmt->qmt_reba_lock);
1017 if (!qmt->qmt_stopping && list_empty(&lqe->lqe_link)) {
1018 list_add_tail(&lqe->lqe_link, &qmt->qmt_reba_list);
1020 if (qmt->qmt_reba_task)
1021 wake_up_process(qmt->qmt_reba_task);
1023 spin_unlock(&qmt->qmt_reba_lock);
1030 struct qmt_reba_args {
1031 struct qmt_device *qra_dev;
1032 struct lu_env qra_env;
1033 struct completion *qra_started;
1037 #define TASK_IDLE TASK_INTERRUPTIBLE
1041 * The rebalance thread is in charge of sending glimpse callbacks on per-ID
1042 * quota locks owned by slaves in order to notify them of:
1043 * - a qunit shrink in which case slaves might release quota space back in
1045 * - set/clear edquot flag used to cache the "quota exhausted" state of the
1046 * master. When the flag is set, slaves know that there is no need to
1047 * try to acquire quota from the master since this latter has already
1048 * distributed all the space.
1050 static int qmt_reba_thread(void *_args)
1052 struct qmt_reba_args *args = _args;
1053 struct qmt_device *qmt = args->qra_dev;
1054 struct lu_env *env = &args->qra_env;
1055 struct lquota_entry *lqe, *tmp;
1058 complete(args->qra_started);
1059 while (({set_current_state(TASK_IDLE);
1060 !kthread_should_stop(); })) {
1062 spin_lock(&qmt->qmt_reba_lock);
1063 list_for_each_entry_safe(lqe, tmp, &qmt->qmt_reba_list,
1065 __set_current_state(TASK_RUNNING);
1066 list_del_init(&lqe->lqe_link);
1067 spin_unlock(&qmt->qmt_reba_lock);
1069 /* lqe_ref == 1 means we hold the last ref,
1070 * so no need to send glimpse callbacks.
1072 if (!kthread_should_stop() &&
1073 atomic_read(&lqe->lqe_ref) > 1)
1074 qmt_id_lock_glimpse(env, qmt, lqe, NULL);
1077 spin_lock(&qmt->qmt_reba_lock);
1079 spin_unlock(&qmt->qmt_reba_lock);
1082 __set_current_state(TASK_RUNNING);
1091 * Start rebalance thread. Called when the QMT is being setup
1093 int qmt_start_reba_thread(struct qmt_device *qmt)
1095 struct task_struct *task;
1096 struct qmt_reba_args *args;
1097 DECLARE_COMPLETION_ONSTACK(started);
1101 OBD_ALLOC_PTR(args);
1104 args->qra_dev = qmt;
1105 args->qra_started = &started;
1107 rc = lu_env_init(&args->qra_env, LCT_MD_THREAD);
1109 CERROR("%s: failed to init env.\n", qmt->qmt_svname);
1113 task = kthread_create(qmt_reba_thread, args,
1114 "qmt_reba_%s", qmt->qmt_svname);
1116 CERROR("%s: failed to start rebalance thread (%ld)\n",
1117 qmt->qmt_svname, PTR_ERR(task));
1118 GOTO(out_env_fini, rc = PTR_ERR(task));
1121 rc = lu_env_add_task(&args->qra_env, task);
1124 GOTO(out_env_fini, rc);
1126 qmt->qmt_reba_task = task;
1127 wake_up_process(task);
1128 wait_for_completion(&started);
1132 lu_env_fini(&args->qra_env);
1139 * Stop rebalance thread. Called when the QMT is about to shutdown.
1141 void qmt_stop_reba_thread(struct qmt_device *qmt)
1143 struct task_struct *task;
1145 spin_lock(&qmt->qmt_reba_lock);
1146 task = qmt->qmt_reba_task;
1147 qmt->qmt_reba_task = NULL;
1148 spin_unlock(&qmt->qmt_reba_lock);
1153 LASSERT(list_empty(&qmt->qmt_reba_list));