4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; if not, write to the
18 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19 * Boston, MA 021110-1307, USA
24 * Copyright (c) 2012 Intel, Inc.
25 * Use is subject to license terms.
27 * Author: Johann Lombardi <johann.lombardi@intel.com>
28 * Author: Niu Yawei <yawei.niu@intel.com>
32 # define EXPORT_SYMTAB
35 #define DEBUG_SUBSYSTEM S_LQUOTA
37 #include <linux/version.h>
39 #include <asm/unistd.h>
40 #include <linux/quotaops.h>
41 #include <linux/init.h>
43 #include <obd_class.h>
44 #include <lustre_param.h>
45 #include <lprocfs_status.h>
47 #include "qsd_internal.h"
50 * helper function returning how much space is currently reserved for requests
53 static inline int lqe_pending_dqacq(struct lquota_entry *lqe)
58 pending = lqe->lqe_pending_req;
65 * helper function returning true when the connection to master is ready to be
68 static inline int qsd_ready(struct qsd_instance *qsd)
70 struct obd_import *imp = NULL;
72 cfs_read_lock(&qsd->qsd_lock);
73 if (qsd->qsd_exp_valid)
74 imp = class_exp2cliimp(qsd->qsd_exp);
75 cfs_read_unlock(&qsd->qsd_lock);
77 return (imp == NULL || imp->imp_invalid) ? false : true;
81 * Helper function returning true when quota space need to be adjusted (some
82 * unused space should be free or pre-acquire) and false otherwise.
84 static bool qsd_adjust_needed(struct lquota_entry *lqe)
86 struct qsd_qtype_info *qqi;
91 if (!lqe->lqe_enforced || qqi->qqi_qsd->qsd_stopping)
92 /* if quota isn't enforced for this id, no need to adjust
93 * Similarly, no need to perform adjustment if the target is in
94 * the process of shutting down. */
97 usage = lqe->lqe_usage;
98 usage += lqe->lqe_pending_write + lqe->lqe_waiting_write;
99 granted = lqe->lqe_granted - lqe->lqe_pending_rel;
101 /* need to re-acquire per-ID lock or release all grant */
102 if (!lustre_handle_is_used(&lqe->lqe_lockh) &&
103 lqe->lqe_granted > lqe->lqe_usage)
106 /* good old quota qunit adjustment logic which has been around since
108 * 1. Need to release some space? */
109 if (granted > usage + lqe->lqe_qunit)
112 /* 2. Any quota overrun? */
113 if (lqe->lqe_usage > lqe->lqe_granted)
114 /* we ended up consuming more than we own, we need to have this
118 /* 3. Time to pre-acquire? */
119 if (!lqe->lqe_edquot && !lqe->lqe_nopreacq && lqe->lqe_qunit != 0 &&
120 granted < usage + lqe->lqe_qtune)
121 /* need to pre-acquire some space if we don't want to block
122 * client's requests */
129 * Callback function called when an acquire/release request sent to the master
132 static void qsd_dqacq_completion(const struct lu_env *env,
133 struct qsd_qtype_info *qqi,
134 struct quota_body *reqbody,
135 struct quota_body *repbody,
136 struct lustre_handle *lockh,
137 union ldlm_wire_lvb *lvb,
140 struct lquota_entry *lqe = (struct lquota_entry *)arg;
141 struct qsd_thread_info *qti;
143 bool adjust = false, cancel = false;
146 LASSERT(qqi != NULL && lqe != NULL);
148 /* environment passed by ptlrpcd is mostly used by CLIO and hasn't the
150 rc = lu_env_refill_by_tags((struct lu_env *)env, LCT_DT_THREAD, 0);
152 LQUOTA_ERROR(lqe, "failed to refill environmnent %d", rc);
154 /* can't afford to adjust quota space with no suitable lu_env */
155 GOTO(out_noadjust, rc);
161 if (ret != 0 && ret != -EDQUOT && ret != -EINPROGRESS) {
162 LQUOTA_ERROR(lqe, "DQACQ failed with %d, op:%x", ret,
167 /* despite -EDQUOT & -EINPROGRESS errors, the master might still
168 * grant us back quota space to adjust quota overrun */
170 LQUOTA_DEBUG(lqe, "DQACQ returned %d", ret);
172 /* Set the lqe_lockh */
173 if (lustre_handle_is_used(lockh) &&
174 !lustre_handle_equal(lockh, &lqe->lqe_lockh))
175 lustre_handle_copy(&lqe->lqe_lockh, lockh);
177 /* If the replied qb_count is zero, it means master didn't process
178 * the DQACQ since the limit for this ID has been removed, so we
179 * should not update quota entry & slave index copy neither. */
180 if (repbody != NULL && repbody->qb_count != 0) {
181 LQUOTA_DEBUG(lqe, "DQACQ qb_count:"LPU64, repbody->qb_count);
183 if (req_is_rel(reqbody->qb_flags)) {
184 if (lqe->lqe_granted < repbody->qb_count) {
185 LQUOTA_ERROR(lqe, "can't release more space "
186 "than owned "LPU64"<"LPU64,
189 lqe->lqe_granted = 0;
191 lqe->lqe_granted -= repbody->qb_count;
193 /* Cancel the per-ID lock initiatively when there
194 * isn't any usage & grant, which can avoid master
195 * sending glimpse unnecessarily to this slave on
197 if (!lqe->lqe_pending_write && !lqe->lqe_granted &&
198 !lqe->lqe_waiting_write && !lqe->lqe_usage)
201 lqe->lqe_granted += repbody->qb_count;
203 qti->qti_rec.lqr_slv_rec.qsr_granted = lqe->lqe_granted;
204 lqe_write_unlock(lqe);
206 /* Update the slave index file in the dedicated thread. So far,
207 * We don't update the version of slave index copy on DQACQ.
208 * No locking is necessary since nobody can change
209 * lqe->lqe_granted while lqe->lqe_pending_req > 0 */
210 qsd_upd_schedule(qqi, lqe, &lqe->lqe_id, &qti->qti_rec, 0,
215 /* extract information from lvb */
216 if (ret == 0 && lvb != 0) {
217 if (lvb->l_lquota.lvb_id_qunit != 0)
218 qsd_set_qunit(lqe, lvb->l_lquota.lvb_id_qunit);
219 if (lvb->l_lquota.lvb_flags & LQUOTA_FL_EDQUOT)
220 lqe->lqe_edquot = true;
222 lqe->lqe_edquot = false;
223 } else if (repbody != NULL && repbody->qb_qunit != 0) {
224 qsd_set_qunit(lqe, repbody->qb_qunit);
227 /* turn off pre-acquire if it failed with -EDQUOT. This is done to avoid
228 * flooding the master with acquire request. Pre-acquire will be turned
229 * on again as soon as qunit is modified */
230 if (req_is_preacq(reqbody->qb_flags) && ret == -EDQUOT)
231 lqe->lqe_nopreacq = true;
233 adjust = qsd_adjust_needed(lqe);
235 lqe->lqe_pending_req--;
236 lqe->lqe_pending_rel = 0;
237 lqe_write_unlock(lqe);
239 cfs_waitq_broadcast(&lqe->lqe_waiters);
241 /* release reference on per-ID lock */
242 if (lustre_handle_is_used(lockh))
243 ldlm_lock_decref(lockh, qsd_id_einfo.ei_mode);
246 qsd_adjust_schedule(lqe, false, true);
248 if (!ret || ret == -EDQUOT)
249 qsd_adjust_schedule(lqe, false, false);
251 qsd_adjust_schedule(lqe, true, false);
255 /* free lvb allocated in qsd_dqacq */
262 static int qsd_acquire_local(struct lquota_entry *lqe, __u64 space)
268 if (!lqe->lqe_enforced)
269 /* not enforced any more, we are good */
273 /* use latest usage */
274 usage = lqe->lqe_usage;
275 /* take pending write into account */
276 usage += lqe->lqe_pending_write;
278 if (space + usage <= lqe->lqe_granted - lqe->lqe_pending_rel) {
279 /* Yay! we got enough space */
280 lqe->lqe_pending_write += space;
281 lqe->lqe_waiting_write -= space;
283 } else if (lqe->lqe_edquot) {
288 lqe_write_unlock(lqe);
293 static bool qsd_calc_space(struct lquota_entry *lqe, enum qsd_ops op,
294 struct quota_body *qbody)
296 struct qsd_qtype_info *qqi;
297 __u64 usage, granted;
299 if (!lqe->lqe_enforced && op != QSD_REL)
304 LASSERT(lqe->lqe_pending_rel == 0);
305 usage = lqe->lqe_usage;
306 usage += lqe->lqe_pending_write + lqe->lqe_waiting_write;
307 granted = lqe->lqe_granted;
313 /* if we overconsumed quota space, we report usage in request
314 * so that master can adjust it unconditionally */
315 if (lqe->lqe_usage > lqe->lqe_granted) {
316 qbody->qb_usage = lqe->lqe_usage;
317 qbody->qb_flags = QUOTA_DQACQ_FL_REPORT;
318 granted = lqe->lqe_usage;
320 /* acquire as much as needed, but not more */
321 if (usage > granted) {
322 qbody->qb_count = usage - granted;
323 qbody->qb_flags |= QUOTA_DQACQ_FL_ACQ;
327 /* When reporting quota (during reintegration or on setquota
328 * glimpse), we should release granted space if usage is 0.
329 * Otherwise, if the usage is less than granted, we need to
330 * acquire the per-ID lock to make sure the unused grant can be
331 * reclaimed by per-ID lock glimpse. */
332 if (lqe->lqe_usage == 0 && lqe->lqe_granted != 0) {
333 LQUOTA_DEBUG(lqe, "Release on report!");
334 GOTO(again, op = QSD_REL);
335 } else if (lqe->lqe_usage == lqe->lqe_granted) {
336 LQUOTA_DEBUG(lqe, "Usage matches granted, needn't do "
337 "anything on report!");
338 } else if (lqe->lqe_usage < lqe->lqe_granted) {
339 LQUOTA_DEBUG(lqe, "Acquire per-ID lock on report!");
341 qbody->qb_flags = QUOTA_DQACQ_FL_ACQ;
343 LASSERT(lqe->lqe_usage > lqe->lqe_granted);
344 LQUOTA_DEBUG(lqe, "Reporting usage");
345 qbody->qb_usage = lqe->lqe_usage;
346 qbody->qb_flags = QUOTA_DQACQ_FL_REPORT;
350 /* release unused quota space unconditionally */
351 if (lqe->lqe_granted > lqe->lqe_usage) {
352 qbody->qb_count = lqe->lqe_granted - lqe->lqe_usage;
353 qbody->qb_flags = QUOTA_DQACQ_FL_REL;
357 /* need to re-acquire per-ID lock or release all grant */
358 if (!lustre_handle_is_used(&lqe->lqe_lockh) &&
359 lqe->lqe_granted > lqe->lqe_usage)
360 GOTO(again, op = QSD_REP);
362 /* release spare grant */
363 if (granted > usage + lqe->lqe_qunit) {
364 /* pre-release quota space */
365 qbody->qb_count = granted - usage;
366 /* if usage == 0, release all granted space */
368 /* try to keep one qunit of quota space */
369 qbody->qb_count -= lqe->lqe_qunit;
370 /* but don't release less than qtune to avoid
371 * releasing space too often */
372 if (qbody->qb_count < lqe->lqe_qtune)
373 qbody->qb_count = lqe->lqe_qtune;
375 qbody->qb_flags = QUOTA_DQACQ_FL_REL;
379 /* if we overconsumed quota space, we report usage in request
380 * so that master can adjust it unconditionally */
381 if (lqe->lqe_usage > lqe->lqe_granted) {
382 qbody->qb_usage = lqe->lqe_usage;
383 qbody->qb_flags = QUOTA_DQACQ_FL_REPORT;
384 granted = lqe->lqe_usage;
387 if (!lqe->lqe_edquot && !lqe->lqe_nopreacq &&
388 lustre_handle_is_used(&lqe->lqe_lockh) &&
389 lqe->lqe_qunit != 0 && granted < usage + lqe->lqe_qtune) {
390 /* To pre-acquire quota space, we report how much spare
391 * quota space the slave currently owns, then the master
392 * will grant us back how much we can pretend given the
393 * current state of affairs */
394 if (granted <= usage)
397 qbody->qb_count = granted - usage;
398 qbody->qb_flags |= QUOTA_DQACQ_FL_PREACQ;
403 CERROR("Invalid qsd operation:%u\n", op);
407 return qbody->qb_flags != 0;
411 * Acquire/release quota space from master.
412 * There are at most 1 in-flight dqacq/dqrel.
414 * \param env - the environment passed by the caller
415 * \param lqe - is the qid entry to be processed
416 * \param op - operation that want to be performed by the caller
418 * \retval 0 - success
419 * \retval -EDQUOT : out of quota
420 * -EINPROGRESS : inform client to retry write/create
421 * -ve : other appropriate errors
423 int qsd_dqacq(const struct lu_env *env, struct lquota_entry *lqe,
426 struct qsd_thread_info *qti = qsd_info(env);
427 struct quota_body *qbody = &qti->qti_body;
428 struct qsd_instance *qsd;
429 struct qsd_qtype_info *qqi;
430 struct ldlm_lock *lock;
432 bool intent = false, sync;
438 if (qsd->qsd_stopping) {
439 LQUOTA_DEBUG(lqe, "Dropping quota req since qsd is stopping");
440 /* Target is about to shut down, client will retry */
441 RETURN(-EINPROGRESS);
444 if (!qsd_ready(qsd)) {
445 LQUOTA_DEBUG(lqe, "Connection to master not ready");
449 /* In most case, reintegration must have been triggered (when enable
450 * quota or on OST start), however, in rare race condition (enabling
451 * quota when starting OSTs), we might miss triggering reintegration
454 * If the previous reintegration failed for some reason, we'll
455 * re-trigger it here as well. */
456 if (!qqi->qqi_glb_uptodate || !qqi->qqi_slv_uptodate) {
457 LQUOTA_DEBUG(lqe, "Not up-to-date, dropping request and kicking"
458 " off reintegration");
459 qsd_start_reint_thread(qqi);
460 RETURN(-EINPROGRESS);
463 LQUOTA_DEBUG(lqe, "DQACQ starts op=%u", op);
465 /* Fill the remote global lock handle, master will check this handle
466 * to see if the slave is sending request with stale lock */
467 cfs_read_lock(&qsd->qsd_lock);
468 lustre_handle_copy(&qbody->qb_glb_lockh, &qqi->qqi_lockh);
469 cfs_read_unlock(&qsd->qsd_lock);
471 if (!lustre_handle_is_used(&qbody->qb_glb_lockh))
474 lock = ldlm_handle2lock(&qbody->qb_glb_lockh);
477 lustre_handle_copy(&qbody->qb_glb_lockh, &lock->l_remote_handle);
480 /* We allow only one in-flight dqacq/dqrel for specified qid, if
481 * there is already in-flight dqacq/dqrel:
483 * - For QSD_ADJ: we should just abort it, since local limit is going
484 * to be changed soon;
485 * - For QSD_ACQ & QSD_REL: we just wait for the in-flight dqacq/dqrel
486 * finished, and return success to the caller. The caller is
487 * responsible for retrying;
488 * - For QSD_REP: we should just abort it, since slave has already
489 * acquired/released grant; */
490 sync = (op == QSD_ACQ || op == QSD_REL) ? true : false;
491 LASSERTF(lqe->lqe_pending_req <= 1, "pending dqacq/dqrel:%d",
492 lqe->lqe_pending_req);
495 if (lqe->lqe_pending_req != 0) {
496 struct l_wait_info lwi = { 0 };
498 lqe_write_unlock(lqe);
500 LQUOTA_DEBUG(lqe, "Abort DQACQ, op=%d", op);
504 LQUOTA_DEBUG(lqe, "waiting for in-flight dqacq/dqrel");
505 l_wait_event(lqe->lqe_waiters,
506 !lqe_pending_dqacq(lqe) || qsd->qsd_stopping,
511 /* fill qb_count & qb_flags */
512 if (!qsd_calc_space(lqe, op, qbody)) {
513 lqe_write_unlock(lqe);
514 LQUOTA_DEBUG(lqe, "No DQACQ required, op=%u", op);
517 lqe->lqe_pending_req++;
518 lqe_write_unlock(lqe);
520 /* fill other quota body fields */
521 qbody->qb_fid = qqi->qqi_fid;
522 qbody->qb_id = lqe->lqe_id;
523 memset(&qbody->qb_lockh, 0, sizeof(qbody->qb_lockh));
524 memset(&qti->qti_lockh, 0, sizeof(qti->qti_lockh));
526 /* hold a refcount until completion */
529 if (req_is_acq(qbody->qb_flags) || req_is_preacq(qbody->qb_flags)) {
530 /* check whether we already own a lock for this ID */
532 lustre_handle_copy(&qti->qti_lockh, &lqe->lqe_lockh);
533 lqe_read_unlock(lqe);
535 rc = qsd_id_lock_match(&qti->qti_lockh, &qbody->qb_lockh);
537 memset(&qti->qti_lockh, 0, sizeof(qti->qti_lockh));
538 if (req_is_preacq(qbody->qb_flags)) {
539 if (req_has_rep(qbody->qb_flags))
540 /* still want to report usage */
541 qbody->qb_flags = QUOTA_DQACQ_FL_REPORT;
543 /* no pre-acquire if no per-ID lock */
544 GOTO(out, rc = -ENOLCK);
546 /* no lock found, should use intent */
549 } else if (req_is_acq(qbody->qb_flags) &&
550 qbody->qb_count == 0) {
551 /* found cached lock, no need to acquire */
557 rc = qsd_send_dqacq(env, qsd->qsd_exp, qbody, sync,
558 qsd_dqacq_completion, qqi, &qti->qti_lockh,
561 union ldlm_wire_lvb *lvb;
565 GOTO(out, rc = -ENOMEM);
567 rc = qsd_intent_lock(env, qsd->qsd_exp, qbody, sync,
568 IT_QUOTA_DQACQ, qsd_dqacq_completion,
569 qqi, lvb, (void *)lqe);
571 /* the completion function will be called by qsd_send_dqacq or
575 qsd_dqacq_completion(env, qqi, qbody, NULL, &qti->qti_lockh, NULL, lqe,
581 * Quota enforcement handler. If local quota can satisfy this operation,
582 * return success, otherwise, acquire more quota from master.
583 * (for write operation, if master isn't available at this moment, return
584 * -EINPROGRESS to inform client to retry the write)
586 * \param env - the environment passed by the caller
587 * \param qsd - is the qsd instance associated with the device in charge
589 * \param qid - is the qid information attached in the transaction handle
590 * \param space - is the space required by the operation
591 * \param flags - if the operation is write, return caller no user/group
592 * and sync commit flags
594 * \retval 0 - success
595 * \retval -EDQUOT : out of quota
596 * -EINPROGRESS : inform client to retry write
597 * -ve : other appropriate errors
599 static int qsd_op_begin0(const struct lu_env *env, struct qsd_qtype_info *qqi,
600 struct lquota_id_info *qid, long long space,
603 struct lquota_entry *lqe;
604 int rc = 0, retry_cnt;
607 if (qid->lqi_qentry != NULL) {
608 /* we already had to deal with this id for this transaction */
609 lqe = qid->lqi_qentry;
610 if (!lqe->lqe_enforced)
613 /* look up lquota entry associated with qid */
614 lqe = lqe_locate(env, qqi->qqi_site, &qid->lqi_id);
616 RETURN(PTR_ERR(lqe));
617 if (!lqe->lqe_enforced) {
621 qid->lqi_qentry = lqe;
622 /* lqe will be released in qsd_op_end() */
626 /* when space is negative or null, we don't need to consume
627 * quota space. That said, we still want to perform space
628 * adjustments in qsd_op_end, so we return here, but with
629 * a reference on the lqe */
631 rc = qsd_refresh_usage(env, lqe);
637 LQUOTA_DEBUG(lqe, "op_begin space:"LPD64, space);
640 lqe->lqe_waiting_write += space;
641 lqe_write_unlock(lqe);
643 for (retry_cnt = 0; rc == 0; retry_cnt++) {
644 /* refresh disk usage if required */
645 rc = qsd_refresh_usage(env, lqe);
649 /* try to consume local quota space */
650 rc = qsd_acquire_local(lqe, space);
652 /* rc == 0, Wouhou! enough local quota space
653 * rc < 0, something bad happened */
656 /* need to acquire more quota space from master, this is done
658 rc = qsd_dqacq(env, lqe, QSD_ACQ);
659 LQUOTA_DEBUG(lqe, "Acquired quota space, retry cnt:%d rc:%d",
664 qid->lqi_space += space;
666 LQUOTA_DEBUG(lqe, "Acquire quota failed:%d", rc);
669 lqe->lqe_waiting_write -= space;
671 if (flags && lqe->lqe_pending_write != 0)
672 /* Inform OSD layer that there are pending writes.
673 * It might want to retry after a sync if appropriate */
674 *flags |= QUOTA_FL_SYNC;
675 lqe_write_unlock(lqe);
677 /* convert recoverable error into -EINPROGRESS, and client will
678 * retry write on -EINPROGRESS. */
679 if (rc == -ETIMEDOUT || rc == -ENOTCONN || rc == -ENOLCK ||
680 rc == -EAGAIN || rc == -EINTR)
686 LASSERT(qid->lqi_is_blk);
688 *flags |= LQUOTA_OVER_FL(qqi->qqi_qtype);
693 usage = lqe->lqe_usage;
694 usage += lqe->lqe_pending_write;
695 usage += lqe->lqe_waiting_write;
696 usage += qqi->qqi_qsd->qsd_sync_threshold;
698 /* if we should notify client to start sync write */
699 if (usage >= lqe->lqe_granted - lqe->lqe_pending_rel)
700 *flags |= LQUOTA_OVER_FL(qqi->qqi_qtype);
702 *flags &= ~LQUOTA_OVER_FL(qqi->qqi_qtype);
703 lqe_read_unlock(lqe);
709 static inline bool qid_equal(struct lquota_id_info *q1,
710 struct lquota_id_info *q2)
712 if (q1->lqi_type != q2->lqi_type)
714 return (q1->lqi_id.qid_uid == q2->lqi_id.qid_uid) ? true : false;
718 * Enforce quota, it's called in the declaration of each operation.
719 * qsd_op_end() will then be called later once all the operations have been
720 * completed in order to release/adjust the quota space.
722 * \param env - the environment passed by the caller
723 * \param qsd - is the qsd instance associated with the device in charge
725 * \param trans - is the quota transaction information
726 * \param qi - qid & space required by current operation
727 * \param flags - if the operation is write, return caller no user/group
728 * and sync commit flags
730 * \retval 0 - success
731 * \retval -EDQUOT : out of quota
732 * -EINPROGRESS : inform client to retry write
733 * -ve : other appropriate errors
735 int qsd_op_begin(const struct lu_env *env, struct qsd_instance *qsd,
736 struct lquota_trans *trans, struct lquota_id_info *qi,
739 struct qsd_qtype_info *qqi;
744 if (unlikely(qsd == NULL))
747 /* We don't enforce quota until the qsd_instance is started */
748 cfs_read_lock(&qsd->qsd_lock);
749 if (!qsd->qsd_started) {
750 cfs_read_unlock(&qsd->qsd_lock);
753 cfs_read_unlock(&qsd->qsd_lock);
755 /* ignore block quota on MDTs, ignore inode quota on OSTs */
756 if ((!qsd->qsd_is_md && !qi->lqi_is_blk) ||
757 (qsd->qsd_is_md && qi->lqi_is_blk))
760 qqi = qsd->qsd_type_array[qi->lqi_type];
762 /* ignore quota enforcement request when:
763 * - quota isn't enforced for this quota type
764 * or - we failed to access the accounting object for this quota type
765 * or - the space to acquire is null
766 * or - the user/group is root */
767 if (!qsd_type_enabled(qsd, qi->lqi_type) || qqi->qqi_acct_obj == NULL ||
768 qi->lqi_id.qid_uid == 0)
771 LASSERTF(trans->lqt_id_cnt <= QUOTA_MAX_TRANSIDS, "id_cnt=%d",
773 /* check whether we already allocated a slot for this id */
774 for (i = 0; i < trans->lqt_id_cnt; i++) {
775 if (qid_equal(qi, &trans->lqt_ids[i])) {
777 /* make sure we are not mixing inodes & blocks */
778 LASSERT(trans->lqt_ids[i].lqi_is_blk == qi->lqi_is_blk);
784 if (unlikely(i >= QUOTA_MAX_TRANSIDS)) {
785 CERROR("%s: more than %d qids enforced for a "
786 "transaction?\n", qsd->qsd_svname, i);
791 trans->lqt_ids[i].lqi_id = qi->lqi_id;
792 trans->lqt_ids[i].lqi_type = qi->lqi_type;
793 trans->lqt_ids[i].lqi_is_blk = qi->lqi_is_blk;
797 /* manage quota enforcement for this ID */
798 rc = qsd_op_begin0(env, qqi, &trans->lqt_ids[i], qi->lqi_space, flags);
802 EXPORT_SYMBOL(qsd_op_begin);
805 * Post quota operation, pre-acquire/release quota from master.
807 * \param env - the environment passed by the caller
808 * \param qsd - is the qsd instance attached to the OSD device which
809 * is handling the operation.
810 * \param qqi - is the qsd_qtype_info structure associated with the quota ID
811 * subject to the operation
812 * \param qid - stores information related to his ID for the operation
813 * which has just completed
815 * \retval 0 - success
816 * \retval -ve - failure
818 static void qsd_op_end0(const struct lu_env *env, struct qsd_qtype_info *qqi,
819 struct lquota_id_info *qid)
821 struct lquota_entry *lqe;
825 lqe = qid->lqi_qentry;
828 qid->lqi_qentry = NULL;
830 /* refresh cached usage if a suitable environment is passed */
832 qsd_refresh_usage(env, lqe);
835 if (qid->lqi_space > 0)
836 lqe->lqe_pending_write -= qid->lqi_space;
838 adjust = qsd_adjust_needed(lqe);
841 lqe_write_unlock(lqe);
844 /* pre-acquire/release quota space is needed */
846 qsd_dqacq(env, lqe, QSD_ADJ);
848 /* no suitable environment, handle adjustment in
849 * separate thread context */
850 qsd_adjust_schedule(lqe, false, false);
857 * Post quota operation. It's called after each operation transaction stopped.
859 * \param env - the environment passed by the caller
860 * \param qsd - is the qsd instance associated with device which is handling
862 * \param qids - all qids information attached in the transaction handle
863 * \param count - is the number of qid entries in the qids array.
865 * \retval 0 - success
866 * \retval -ve - failure
868 void qsd_op_end(const struct lu_env *env, struct qsd_instance *qsd,
869 struct lquota_trans *trans)
874 if (unlikely(qsd == NULL))
877 /* We don't enforce quota until the qsd_instance is started */
878 cfs_read_lock(&qsd->qsd_lock);
879 if (!qsd->qsd_started) {
880 cfs_read_unlock(&qsd->qsd_lock);
883 cfs_read_unlock(&qsd->qsd_lock);
885 LASSERT(trans != NULL);
887 for (i = 0; i < trans->lqt_id_cnt; i++) {
888 struct qsd_qtype_info *qqi;
890 if (trans->lqt_ids[i].lqi_qentry == NULL)
893 qqi = qsd->qsd_type_array[trans->lqt_ids[i].lqi_type];
894 qsd_op_end0(env, qqi, &trans->lqt_ids[i]);
897 /* reset id_count to 0 so that a second accidental call to qsd_op_end()
898 * does not result in failure */
899 trans->lqt_id_cnt = 0;
902 EXPORT_SYMBOL(qsd_op_end);
904 void qsd_adjust_quota(const struct lu_env *env, struct qsd_instance *qsd,
905 union lquota_id *qid, int qtype)
907 struct lquota_entry *lqe;
908 struct qsd_qtype_info *qqi;
912 if (unlikely(qsd == NULL))
915 /* We don't enforce quota until the qsd_instance is started */
916 cfs_read_lock(&qsd->qsd_lock);
917 if (!qsd->qsd_started) {
918 cfs_read_unlock(&qsd->qsd_lock);
921 cfs_read_unlock(&qsd->qsd_lock);
923 qqi = qsd->qsd_type_array[qtype];
926 if (!qsd_type_enabled(qsd, qtype) || qqi->qqi_acct_obj == NULL ||
930 cfs_read_lock(&qsd->qsd_lock);
931 if (!qsd->qsd_started) {
932 cfs_read_unlock(&qsd->qsd_lock);
935 cfs_read_unlock(&qsd->qsd_lock);
937 lqe = lqe_locate(env, qqi->qqi_site, qid);
939 CERROR("%s: fail to locate lqe for id:"LPU64", type:%d\n",
940 qsd->qsd_svname, qid->qid_uid, qtype);
944 qsd_refresh_usage(env, lqe);
947 adjust = qsd_adjust_needed(lqe);
948 lqe_read_unlock(lqe);
951 qsd_dqacq(env, lqe, QSD_ADJ);
956 EXPORT_SYMBOL(qsd_adjust_quota);