4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; if not, write to the
18 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19 * Boston, MA 021110-1307, USA
24 * Copyright (c) 2012 Intel, Inc.
25 * Use is subject to license terms.
27 * Author: Johann Lombardi <johann.lombardi@intel.com>
28 * Author: Niu Yawei <yawei.niu@intel.com>
32 # define EXPORT_SYMTAB
35 #define DEBUG_SUBSYSTEM S_LQUOTA
37 #include <linux/version.h>
39 #include <asm/unistd.h>
40 #include <linux/quotaops.h>
41 #include <linux/init.h>
43 #include <obd_class.h>
44 #include <lustre_param.h>
45 #include <lprocfs_status.h>
47 #include "qsd_internal.h"
50 * helper function returning how much space is currently reserved for requests
53 static inline int lqe_pending_dqacq(struct lquota_entry *lqe)
58 pending = lqe->lqe_pending_req;
65 * helper function returning true when the connection to master is ready to be
68 static inline int qsd_ready(struct qsd_instance *qsd)
70 struct obd_import *imp = NULL;
72 cfs_read_lock(&qsd->qsd_lock);
73 if (qsd->qsd_exp_valid)
74 imp = class_exp2cliimp(qsd->qsd_exp);
75 cfs_read_unlock(&qsd->qsd_lock);
77 return (imp == NULL || imp->imp_invalid) ? false : true;
81 * Helper function returning true when quota space need to be adjusted (some
82 * unused space should be free or pre-acquire) and false otherwise.
84 static bool qsd_adjust_needed(struct lquota_entry *lqe)
86 struct qsd_qtype_info *qqi;
91 if (!lqe->lqe_enforced || qqi->qqi_qsd->qsd_stopping)
92 /* if quota isn't enforced for this id, no need to adjust
93 * Similarly, no need to perform adjustment if the target is in
94 * the process of shutting down. */
97 usage = lqe->lqe_usage;
98 usage += lqe->lqe_pending_write + lqe->lqe_waiting_write;
99 granted = lqe->lqe_granted - lqe->lqe_pending_rel;
101 /* need to re-acquire per-ID lock or release all grant */
102 if (!lustre_handle_is_used(&lqe->lqe_lockh) &&
103 lqe->lqe_granted > lqe->lqe_usage)
106 /* good old quota qunit adjustment logic which has been around since
108 * 1. Need to release some space? */
109 if (granted > usage + lqe->lqe_qunit)
112 /* 2. Any quota overrun? */
113 if (lqe->lqe_usage > lqe->lqe_granted)
114 /* we ended up consuming more than we own, we need to have this
118 /* 3. Time to pre-acquire? */
119 if (!lqe->lqe_edquot && !lqe->lqe_nopreacq && lqe->lqe_qunit != 0 &&
120 granted < usage + lqe->lqe_qtune)
121 /* need to pre-acquire some space if we don't want to block
122 * client's requests */
129 * Callback function called when an acquire/release request sent to the master
132 static void qsd_dqacq_completion(const struct lu_env *env,
133 struct qsd_qtype_info *qqi,
134 struct quota_body *reqbody,
135 struct quota_body *repbody,
136 struct lustre_handle *lockh,
137 union ldlm_wire_lvb *lvb,
140 struct lquota_entry *lqe = (struct lquota_entry *)arg;
141 struct qsd_thread_info *qti;
143 bool adjust = false, cancel = false;
146 LASSERT(qqi != NULL && lqe != NULL);
148 /* environment passed by ptlrpcd is mostly used by CLIO and hasn't the
150 rc = lu_env_refill_by_tags((struct lu_env *)env, LCT_DT_THREAD, 0);
152 LQUOTA_ERROR(lqe, "failed to refill environmnent %d", rc);
154 /* can't afford to adjust quota space with no suitable lu_env */
155 GOTO(out_noadjust, rc);
161 LQUOTA_DEBUG(lqe, "DQACQ returned %d, flags:0x%x", ret,
164 /* despite -EDQUOT & -EINPROGRESS errors, the master might still
165 * grant us back quota space to adjust quota overrun */
166 if (ret != 0 && ret != -EDQUOT && ret != -EINPROGRESS) {
167 if (ret != -ETIMEDOUT && ret != -ENOTCONN &&
168 ret != -ESHUTDOWN && ret != -EAGAIN)
169 /* print errors only if return code is unexpected */
170 LQUOTA_ERROR(lqe, "DQACQ failed with %d, flags:0x%x",
171 ret, reqbody->qb_flags);
175 /* Set the lqe_lockh */
176 if (lustre_handle_is_used(lockh) &&
177 !lustre_handle_equal(lockh, &lqe->lqe_lockh))
178 lustre_handle_copy(&lqe->lqe_lockh, lockh);
180 /* If the replied qb_count is zero, it means master didn't process
181 * the DQACQ since the limit for this ID has been removed, so we
182 * should not update quota entry & slave index copy neither. */
183 if (repbody != NULL && repbody->qb_count != 0) {
184 LQUOTA_DEBUG(lqe, "DQACQ qb_count:"LPU64, repbody->qb_count);
186 if (req_is_rel(reqbody->qb_flags)) {
187 if (lqe->lqe_granted < repbody->qb_count) {
188 LQUOTA_ERROR(lqe, "can't release more space "
189 "than owned "LPU64"<"LPU64,
192 lqe->lqe_granted = 0;
194 lqe->lqe_granted -= repbody->qb_count;
196 /* Cancel the per-ID lock initiatively when there
197 * isn't any usage & grant, which can avoid master
198 * sending glimpse unnecessarily to this slave on
200 if (!lqe->lqe_pending_write && !lqe->lqe_granted &&
201 !lqe->lqe_waiting_write && !lqe->lqe_usage)
204 lqe->lqe_granted += repbody->qb_count;
206 qti->qti_rec.lqr_slv_rec.qsr_granted = lqe->lqe_granted;
207 lqe_write_unlock(lqe);
209 /* Update the slave index file in the dedicated thread. So far,
210 * We don't update the version of slave index copy on DQACQ.
211 * No locking is necessary since nobody can change
212 * lqe->lqe_granted while lqe->lqe_pending_req > 0 */
213 qsd_upd_schedule(qqi, lqe, &lqe->lqe_id, &qti->qti_rec, 0,
218 /* extract information from lvb */
219 if (ret == 0 && lvb != 0) {
220 if (lvb->l_lquota.lvb_id_qunit != 0)
221 qsd_set_qunit(lqe, lvb->l_lquota.lvb_id_qunit);
222 if (lvb->l_lquota.lvb_flags & LQUOTA_FL_EDQUOT)
223 lqe->lqe_edquot = true;
225 lqe->lqe_edquot = false;
226 } else if (repbody != NULL && repbody->qb_qunit != 0) {
227 qsd_set_qunit(lqe, repbody->qb_qunit);
230 /* turn off pre-acquire if it failed with -EDQUOT. This is done to avoid
231 * flooding the master with acquire request. Pre-acquire will be turned
232 * on again as soon as qunit is modified */
233 if (req_is_preacq(reqbody->qb_flags) && ret == -EDQUOT)
234 lqe->lqe_nopreacq = true;
236 adjust = qsd_adjust_needed(lqe);
238 lqe->lqe_pending_req--;
239 lqe->lqe_pending_rel = 0;
240 lqe_write_unlock(lqe);
242 cfs_waitq_broadcast(&lqe->lqe_waiters);
244 /* release reference on per-ID lock */
245 if (lustre_handle_is_used(lockh))
246 ldlm_lock_decref(lockh, qsd_id_einfo.ei_mode);
249 qsd_adjust_schedule(lqe, false, true);
251 if (!ret || ret == -EDQUOT)
252 qsd_adjust_schedule(lqe, false, false);
254 qsd_adjust_schedule(lqe, true, false);
258 /* free lvb allocated in qsd_dqacq */
265 static int qsd_acquire_local(struct lquota_entry *lqe, __u64 space)
271 if (!lqe->lqe_enforced)
272 /* not enforced any more, we are good */
276 /* use latest usage */
277 usage = lqe->lqe_usage;
278 /* take pending write into account */
279 usage += lqe->lqe_pending_write;
281 if (space + usage <= lqe->lqe_granted - lqe->lqe_pending_rel) {
282 /* Yay! we got enough space */
283 lqe->lqe_pending_write += space;
284 lqe->lqe_waiting_write -= space;
286 } else if (lqe->lqe_edquot) {
291 lqe_write_unlock(lqe);
296 static bool qsd_calc_space(struct lquota_entry *lqe, enum qsd_ops op,
297 struct quota_body *qbody)
299 struct qsd_qtype_info *qqi;
300 __u64 usage, granted;
302 if (!lqe->lqe_enforced && op != QSD_REL)
307 LASSERT(lqe->lqe_pending_rel == 0);
308 usage = lqe->lqe_usage;
309 usage += lqe->lqe_pending_write + lqe->lqe_waiting_write;
310 granted = lqe->lqe_granted;
316 /* if we overconsumed quota space, we report usage in request
317 * so that master can adjust it unconditionally */
318 if (lqe->lqe_usage > lqe->lqe_granted) {
319 qbody->qb_usage = lqe->lqe_usage;
320 qbody->qb_flags = QUOTA_DQACQ_FL_REPORT;
321 granted = lqe->lqe_usage;
323 /* acquire as much as needed, but not more */
324 if (usage > granted) {
325 qbody->qb_count = usage - granted;
326 qbody->qb_flags |= QUOTA_DQACQ_FL_ACQ;
330 /* When reporting quota (during reintegration or on setquota
331 * glimpse), we should release granted space if usage is 0.
332 * Otherwise, if the usage is less than granted, we need to
333 * acquire the per-ID lock to make sure the unused grant can be
334 * reclaimed by per-ID lock glimpse. */
335 if (lqe->lqe_usage == 0 && lqe->lqe_granted != 0) {
336 LQUOTA_DEBUG(lqe, "Release on report!");
337 GOTO(again, op = QSD_REL);
338 } else if (lqe->lqe_usage == lqe->lqe_granted) {
339 LQUOTA_DEBUG(lqe, "Usage matches granted, needn't do "
340 "anything on report!");
341 } else if (lqe->lqe_usage < lqe->lqe_granted) {
342 LQUOTA_DEBUG(lqe, "Acquire per-ID lock on report!");
344 qbody->qb_flags = QUOTA_DQACQ_FL_ACQ;
346 LASSERT(lqe->lqe_usage > lqe->lqe_granted);
347 LQUOTA_DEBUG(lqe, "Reporting usage");
348 qbody->qb_usage = lqe->lqe_usage;
349 qbody->qb_flags = QUOTA_DQACQ_FL_REPORT;
353 /* release unused quota space unconditionally */
354 if (lqe->lqe_granted > lqe->lqe_usage) {
355 qbody->qb_count = lqe->lqe_granted - lqe->lqe_usage;
356 qbody->qb_flags = QUOTA_DQACQ_FL_REL;
360 /* need to re-acquire per-ID lock or release all grant */
361 if (!lustre_handle_is_used(&lqe->lqe_lockh) &&
362 lqe->lqe_granted > lqe->lqe_usage)
363 GOTO(again, op = QSD_REP);
365 /* release spare grant */
366 if (granted > usage + lqe->lqe_qunit) {
367 /* pre-release quota space */
368 qbody->qb_count = granted - usage;
369 /* if usage == 0, release all granted space */
371 /* try to keep one qunit of quota space */
372 qbody->qb_count -= lqe->lqe_qunit;
373 /* but don't release less than qtune to avoid
374 * releasing space too often */
375 if (qbody->qb_count < lqe->lqe_qtune)
376 qbody->qb_count = lqe->lqe_qtune;
378 qbody->qb_flags = QUOTA_DQACQ_FL_REL;
382 /* if we overconsumed quota space, we report usage in request
383 * so that master can adjust it unconditionally */
384 if (lqe->lqe_usage > lqe->lqe_granted) {
385 qbody->qb_usage = lqe->lqe_usage;
386 qbody->qb_flags = QUOTA_DQACQ_FL_REPORT;
387 granted = lqe->lqe_usage;
390 if (!lqe->lqe_edquot && !lqe->lqe_nopreacq &&
391 lustre_handle_is_used(&lqe->lqe_lockh) && usage > 0 &&
392 lqe->lqe_qunit != 0 && granted < usage + lqe->lqe_qtune) {
393 /* To pre-acquire quota space, we report how much spare
394 * quota space the slave currently owns, then the master
395 * will grant us back how much we can pretend given the
396 * current state of affairs */
397 if (granted <= usage)
400 qbody->qb_count = granted - usage;
401 qbody->qb_flags |= QUOTA_DQACQ_FL_PREACQ;
406 CERROR("Invalid qsd operation:%u\n", op);
410 return qbody->qb_flags != 0;
414 * Acquire/release quota space from master.
415 * There are at most 1 in-flight dqacq/dqrel.
417 * \param env - the environment passed by the caller
418 * \param lqe - is the qid entry to be processed
419 * \param op - operation that want to be performed by the caller
421 * \retval 0 - success
422 * \retval -EDQUOT : out of quota
423 * -EINPROGRESS : inform client to retry write/create
424 * -ve : other appropriate errors
426 int qsd_dqacq(const struct lu_env *env, struct lquota_entry *lqe,
429 struct qsd_thread_info *qti = qsd_info(env);
430 struct quota_body *qbody = &qti->qti_body;
431 struct qsd_instance *qsd;
432 struct qsd_qtype_info *qqi;
433 struct ldlm_lock *lock;
435 bool intent = false, sync;
441 if (qsd->qsd_stopping) {
442 LQUOTA_DEBUG(lqe, "Dropping quota req since qsd is stopping");
443 /* Target is about to shut down, client will retry */
444 RETURN(-EINPROGRESS);
447 if (!qsd_ready(qsd)) {
448 LQUOTA_DEBUG(lqe, "Connection to master not ready");
452 /* In most case, reintegration must have been triggered (when enable
453 * quota or on OST start), however, in rare race condition (enabling
454 * quota when starting OSTs), we might miss triggering reintegration
457 * If the previous reintegration failed for some reason, we'll
458 * re-trigger it here as well. */
459 if (!qqi->qqi_glb_uptodate || !qqi->qqi_slv_uptodate) {
460 LQUOTA_DEBUG(lqe, "Not up-to-date, dropping request and kicking"
461 " off reintegration");
462 qsd_start_reint_thread(qqi);
463 RETURN(-EINPROGRESS);
466 LQUOTA_DEBUG(lqe, "DQACQ starts op=%u", op);
468 /* Fill the remote global lock handle, master will check this handle
469 * to see if the slave is sending request with stale lock */
470 cfs_read_lock(&qsd->qsd_lock);
471 lustre_handle_copy(&qbody->qb_glb_lockh, &qqi->qqi_lockh);
472 cfs_read_unlock(&qsd->qsd_lock);
474 if (!lustre_handle_is_used(&qbody->qb_glb_lockh))
477 lock = ldlm_handle2lock(&qbody->qb_glb_lockh);
480 lustre_handle_copy(&qbody->qb_glb_lockh, &lock->l_remote_handle);
483 /* We allow only one in-flight dqacq/dqrel for specified qid, if
484 * there is already in-flight dqacq/dqrel:
486 * - For QSD_ADJ: we should just abort it, since local limit is going
487 * to be changed soon;
488 * - For QSD_ACQ & QSD_REL: we just wait for the in-flight dqacq/dqrel
489 * finished, and return success to the caller. The caller is
490 * responsible for retrying;
491 * - For QSD_REP: we should just abort it, since slave has already
492 * acquired/released grant; */
493 sync = (op == QSD_ACQ || op == QSD_REL) ? true : false;
494 LASSERTF(lqe->lqe_pending_req <= 1, "pending dqacq/dqrel:%d",
495 lqe->lqe_pending_req);
498 if (lqe->lqe_pending_req != 0) {
499 struct l_wait_info lwi = { 0 };
501 lqe_write_unlock(lqe);
503 LQUOTA_DEBUG(lqe, "Abort DQACQ, op=%d", op);
507 LQUOTA_DEBUG(lqe, "waiting for in-flight dqacq/dqrel");
508 l_wait_event(lqe->lqe_waiters,
509 !lqe_pending_dqacq(lqe) || qsd->qsd_stopping,
514 /* fill qb_count & qb_flags */
515 if (!qsd_calc_space(lqe, op, qbody)) {
516 lqe_write_unlock(lqe);
517 LQUOTA_DEBUG(lqe, "No DQACQ required, op=%u", op);
520 lqe->lqe_pending_req++;
521 lqe_write_unlock(lqe);
523 /* fill other quota body fields */
524 qbody->qb_fid = qqi->qqi_fid;
525 qbody->qb_id = lqe->lqe_id;
526 memset(&qbody->qb_lockh, 0, sizeof(qbody->qb_lockh));
527 memset(&qti->qti_lockh, 0, sizeof(qti->qti_lockh));
529 /* hold a refcount until completion */
532 if (req_is_acq(qbody->qb_flags) || req_is_preacq(qbody->qb_flags)) {
533 /* check whether we already own a lock for this ID */
535 lustre_handle_copy(&qti->qti_lockh, &lqe->lqe_lockh);
536 lqe_read_unlock(lqe);
538 rc = qsd_id_lock_match(&qti->qti_lockh, &qbody->qb_lockh);
540 memset(&qti->qti_lockh, 0, sizeof(qti->qti_lockh));
541 if (req_is_preacq(qbody->qb_flags)) {
542 if (req_has_rep(qbody->qb_flags))
543 /* still want to report usage */
544 qbody->qb_flags = QUOTA_DQACQ_FL_REPORT;
546 /* no pre-acquire if no per-ID lock */
547 GOTO(out, rc = -ENOLCK);
549 /* no lock found, should use intent */
552 } else if (req_is_acq(qbody->qb_flags) &&
553 qbody->qb_count == 0) {
554 /* found cached lock, no need to acquire */
560 rc = qsd_send_dqacq(env, qsd->qsd_exp, qbody, sync,
561 qsd_dqacq_completion, qqi, &qti->qti_lockh,
564 union ldlm_wire_lvb *lvb;
568 GOTO(out, rc = -ENOMEM);
570 rc = qsd_intent_lock(env, qsd->qsd_exp, qbody, sync,
571 IT_QUOTA_DQACQ, qsd_dqacq_completion,
572 qqi, lvb, (void *)lqe);
574 /* the completion function will be called by qsd_send_dqacq or
578 qsd_dqacq_completion(env, qqi, qbody, NULL, &qti->qti_lockh, NULL, lqe,
584 * Quota enforcement handler. If local quota can satisfy this operation,
585 * return success, otherwise, acquire more quota from master.
586 * (for write operation, if master isn't available at this moment, return
587 * -EINPROGRESS to inform client to retry the write)
589 * \param env - the environment passed by the caller
590 * \param qsd - is the qsd instance associated with the device in charge
592 * \param qid - is the qid information attached in the transaction handle
593 * \param space - is the space required by the operation
594 * \param flags - if the operation is write, return caller no user/group
595 * and sync commit flags
597 * \retval 0 - success
598 * \retval -EDQUOT : out of quota
599 * -EINPROGRESS : inform client to retry write
600 * -ve : other appropriate errors
602 static int qsd_op_begin0(const struct lu_env *env, struct qsd_qtype_info *qqi,
603 struct lquota_id_info *qid, long long space,
606 struct lquota_entry *lqe;
607 int rc = 0, retry_cnt;
610 if (qid->lqi_qentry != NULL) {
611 /* we already had to deal with this id for this transaction */
612 lqe = qid->lqi_qentry;
613 if (!lqe->lqe_enforced)
616 /* look up lquota entry associated with qid */
617 lqe = lqe_locate(env, qqi->qqi_site, &qid->lqi_id);
619 RETURN(PTR_ERR(lqe));
620 if (!lqe->lqe_enforced) {
624 qid->lqi_qentry = lqe;
625 /* lqe will be released in qsd_op_end() */
629 /* when space is negative or null, we don't need to consume
630 * quota space. That said, we still want to perform space
631 * adjustments in qsd_op_end, so we return here, but with
632 * a reference on the lqe */
634 rc = qsd_refresh_usage(env, lqe);
640 LQUOTA_DEBUG(lqe, "op_begin space:"LPD64, space);
643 lqe->lqe_waiting_write += space;
644 lqe_write_unlock(lqe);
646 for (retry_cnt = 0; rc == 0; retry_cnt++) {
647 /* refresh disk usage if required */
648 rc = qsd_refresh_usage(env, lqe);
652 /* try to consume local quota space */
653 rc = qsd_acquire_local(lqe, space);
655 /* rc == 0, Wouhou! enough local quota space
656 * rc < 0, something bad happened */
659 /* need to acquire more quota space from master, this is done
661 rc = qsd_dqacq(env, lqe, QSD_ACQ);
662 LQUOTA_DEBUG(lqe, "Acquired quota space, retry cnt:%d rc:%d",
667 qid->lqi_space += space;
669 LQUOTA_DEBUG(lqe, "Acquire quota failed:%d", rc);
672 lqe->lqe_waiting_write -= space;
674 if (flags && lqe->lqe_pending_write != 0)
675 /* Inform OSD layer that there are pending writes.
676 * It might want to retry after a sync if appropriate */
677 *flags |= QUOTA_FL_SYNC;
678 lqe_write_unlock(lqe);
680 /* convert recoverable error into -EINPROGRESS, and client will
681 * retry write on -EINPROGRESS. */
682 if (rc == -ETIMEDOUT || rc == -ENOTCONN || rc == -ENOLCK ||
683 rc == -EAGAIN || rc == -EINTR)
689 LASSERT(qid->lqi_is_blk);
691 *flags |= LQUOTA_OVER_FL(qqi->qqi_qtype);
696 usage = lqe->lqe_usage;
697 usage += lqe->lqe_pending_write;
698 usage += lqe->lqe_waiting_write;
699 usage += qqi->qqi_qsd->qsd_sync_threshold;
701 /* if we should notify client to start sync write */
702 if (usage >= lqe->lqe_granted - lqe->lqe_pending_rel)
703 *flags |= LQUOTA_OVER_FL(qqi->qqi_qtype);
705 *flags &= ~LQUOTA_OVER_FL(qqi->qqi_qtype);
706 lqe_read_unlock(lqe);
712 static inline bool qid_equal(struct lquota_id_info *q1,
713 struct lquota_id_info *q2)
715 if (q1->lqi_type != q2->lqi_type)
717 return (q1->lqi_id.qid_uid == q2->lqi_id.qid_uid) ? true : false;
721 * Enforce quota, it's called in the declaration of each operation.
722 * qsd_op_end() will then be called later once all the operations have been
723 * completed in order to release/adjust the quota space.
725 * \param env - the environment passed by the caller
726 * \param qsd - is the qsd instance associated with the device in charge
728 * \param trans - is the quota transaction information
729 * \param qi - qid & space required by current operation
730 * \param flags - if the operation is write, return caller no user/group
731 * and sync commit flags
733 * \retval 0 - success
734 * \retval -EDQUOT : out of quota
735 * -EINPROGRESS : inform client to retry write
736 * -ve : other appropriate errors
738 int qsd_op_begin(const struct lu_env *env, struct qsd_instance *qsd,
739 struct lquota_trans *trans, struct lquota_id_info *qi,
746 if (unlikely(qsd == NULL))
749 /* We don't enforce quota until the qsd_instance is started */
750 cfs_read_lock(&qsd->qsd_lock);
751 if (!qsd->qsd_started) {
752 cfs_read_unlock(&qsd->qsd_lock);
755 cfs_read_unlock(&qsd->qsd_lock);
757 /* ignore block quota on MDTs, ignore inode quota on OSTs */
758 if ((!qsd->qsd_is_md && !qi->lqi_is_blk) ||
759 (qsd->qsd_is_md && qi->lqi_is_blk))
762 /* ignore quota enforcement request when:
763 * - quota isn't enforced for this quota type
764 * or - the user/group is root */
765 if (!qsd_type_enabled(qsd, qi->lqi_type) || qi->lqi_id.qid_uid == 0)
768 LASSERTF(trans->lqt_id_cnt <= QUOTA_MAX_TRANSIDS, "id_cnt=%d",
770 /* check whether we already allocated a slot for this id */
771 for (i = 0; i < trans->lqt_id_cnt; i++) {
772 if (qid_equal(qi, &trans->lqt_ids[i])) {
774 /* make sure we are not mixing inodes & blocks */
775 LASSERT(trans->lqt_ids[i].lqi_is_blk == qi->lqi_is_blk);
781 if (unlikely(i >= QUOTA_MAX_TRANSIDS)) {
782 CERROR("%s: more than %d qids enforced for a "
783 "transaction?\n", qsd->qsd_svname, i);
788 trans->lqt_ids[i].lqi_id = qi->lqi_id;
789 trans->lqt_ids[i].lqi_type = qi->lqi_type;
790 trans->lqt_ids[i].lqi_is_blk = qi->lqi_is_blk;
794 /* manage quota enforcement for this ID */
795 rc = qsd_op_begin0(env, qsd->qsd_type_array[qi->lqi_type],
796 &trans->lqt_ids[i], qi->lqi_space, flags);
799 EXPORT_SYMBOL(qsd_op_begin);
802 * Post quota operation, pre-acquire/release quota from master.
804 * \param env - the environment passed by the caller
805 * \param qsd - is the qsd instance attached to the OSD device which
806 * is handling the operation.
807 * \param qqi - is the qsd_qtype_info structure associated with the quota ID
808 * subject to the operation
809 * \param qid - stores information related to his ID for the operation
810 * which has just completed
812 * \retval 0 - success
813 * \retval -ve - failure
815 static void qsd_op_end0(const struct lu_env *env, struct qsd_qtype_info *qqi,
816 struct lquota_id_info *qid)
818 struct lquota_entry *lqe;
822 lqe = qid->lqi_qentry;
825 qid->lqi_qentry = NULL;
827 /* refresh cached usage if a suitable environment is passed */
829 qsd_refresh_usage(env, lqe);
832 if (qid->lqi_space > 0)
833 lqe->lqe_pending_write -= qid->lqi_space;
835 adjust = qsd_adjust_needed(lqe);
838 lqe_write_unlock(lqe);
841 /* pre-acquire/release quota space is needed */
843 qsd_dqacq(env, lqe, QSD_ADJ);
845 /* no suitable environment, handle adjustment in
846 * separate thread context */
847 qsd_adjust_schedule(lqe, false, false);
854 * Post quota operation. It's called after each operation transaction stopped.
856 * \param env - the environment passed by the caller
857 * \param qsd - is the qsd instance associated with device which is handling
859 * \param qids - all qids information attached in the transaction handle
860 * \param count - is the number of qid entries in the qids array.
862 * \retval 0 - success
863 * \retval -ve - failure
865 void qsd_op_end(const struct lu_env *env, struct qsd_instance *qsd,
866 struct lquota_trans *trans)
871 if (unlikely(qsd == NULL))
874 /* We don't enforce quota until the qsd_instance is started */
875 cfs_read_lock(&qsd->qsd_lock);
876 if (!qsd->qsd_started) {
877 cfs_read_unlock(&qsd->qsd_lock);
880 cfs_read_unlock(&qsd->qsd_lock);
882 LASSERT(trans != NULL);
884 for (i = 0; i < trans->lqt_id_cnt; i++) {
885 struct qsd_qtype_info *qqi;
887 if (trans->lqt_ids[i].lqi_qentry == NULL)
890 qqi = qsd->qsd_type_array[trans->lqt_ids[i].lqi_type];
891 qsd_op_end0(env, qqi, &trans->lqt_ids[i]);
894 /* reset id_count to 0 so that a second accidental call to qsd_op_end()
895 * does not result in failure */
896 trans->lqt_id_cnt = 0;
899 EXPORT_SYMBOL(qsd_op_end);
901 void qsd_adjust_quota(const struct lu_env *env, struct qsd_instance *qsd,
902 union lquota_id *qid, int qtype)
904 struct lquota_entry *lqe;
905 struct qsd_qtype_info *qqi;
909 if (unlikely(qsd == NULL))
912 /* We don't enforce quota until the qsd_instance is started */
913 cfs_read_lock(&qsd->qsd_lock);
914 if (!qsd->qsd_started) {
915 cfs_read_unlock(&qsd->qsd_lock);
918 cfs_read_unlock(&qsd->qsd_lock);
920 qqi = qsd->qsd_type_array[qtype];
923 if (!qsd_type_enabled(qsd, qtype) || qqi->qqi_acct_obj == NULL ||
927 cfs_read_lock(&qsd->qsd_lock);
928 if (!qsd->qsd_started) {
929 cfs_read_unlock(&qsd->qsd_lock);
932 cfs_read_unlock(&qsd->qsd_lock);
934 lqe = lqe_locate(env, qqi->qqi_site, qid);
936 CERROR("%s: fail to locate lqe for id:"LPU64", type:%d\n",
937 qsd->qsd_svname, qid->qid_uid, qtype);
941 qsd_refresh_usage(env, lqe);
944 adjust = qsd_adjust_needed(lqe);
945 lqe_read_unlock(lqe);
948 qsd_dqacq(env, lqe, QSD_ADJ);
953 EXPORT_SYMBOL(qsd_adjust_quota);