4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; if not, write to the
18 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19 * Boston, MA 021110-1307, USA
24 * Copyright (c) 2012 Intel, Inc.
25 * Use is subject to license terms.
27 * Author: Johann Lombardi <johann.lombardi@intel.com>
28 * Author: Niu Yawei <yawei.niu@intel.com>
32 # define EXPORT_SYMTAB
35 #define DEBUG_SUBSYSTEM S_LQUOTA
37 #include "qsd_internal.h"
40 * helper function bumping lqe_pending_req if there is no quota request in
41 * flight for the lquota entry \a lqe. Otherwise, EBUSY is returned.
43 static inline int qsd_request_enter(struct lquota_entry *lqe)
45 /* is there already a quota request in flight? */
46 if (lqe->lqe_pending_req != 0) {
47 LQUOTA_DEBUG(lqe, "already a request in flight");
51 if (lqe->lqe_pending_rel != 0) {
52 LQUOTA_ERROR(lqe, "no request in flight with pending_rel="LPU64,
53 lqe->lqe_pending_rel);
57 lqe->lqe_pending_req++;
62 * Companion of qsd_request_enter() dropping lqe_pending_req to 0.
64 static inline void qsd_request_exit(struct lquota_entry *lqe)
66 if (lqe->lqe_pending_req != 1) {
67 LQUOTA_ERROR(lqe, "lqe_pending_req != 1!!!");
70 lqe->lqe_pending_req--;
71 lqe->lqe_pending_rel = 0;
72 cfs_waitq_broadcast(&lqe->lqe_waiters);
76 * Check whether a qsd instance is all set to send quota request to master.
77 * This includes checking whether:
78 * - the connection to master is set up and usable,
79 * - the qsd isn't stopping
80 * - reintegration has been successfully completed and all indexes are
83 * \param lqe - is the lquota entry for which we would like to send an quota
85 * \param lockh - is the remote handle of the global lock returned on success
87 * \retval 0 on success, appropriate error on failure
89 static int qsd_ready(struct lquota_entry *lqe, struct lustre_handle *lockh)
91 struct qsd_qtype_info *qqi = lqe2qqi(lqe);
92 struct qsd_instance *qsd = qqi->qqi_qsd;
93 struct obd_import *imp = NULL;
94 struct ldlm_lock *lock;
97 cfs_read_lock(&qsd->qsd_lock);
98 /* is the qsd about to shut down? */
99 if (qsd->qsd_stopping) {
100 cfs_read_unlock(&qsd->qsd_lock);
101 LQUOTA_DEBUG(lqe, "dropping quota req since qsd is stopping");
102 /* Target is about to shut down, client will retry */
103 RETURN(-EINPROGRESS);
106 /* is the connection to the quota master ready? */
107 if (qsd->qsd_exp_valid)
108 imp = class_exp2cliimp(qsd->qsd_exp);
109 if (imp == NULL || imp->imp_invalid) {
110 cfs_read_unlock(&qsd->qsd_lock);
111 LQUOTA_DEBUG(lqe, "connection to master not ready");
115 /* In most case, reintegration must have been triggered (when enable
116 * quota or on OST start), however, in rare race condition (enabling
117 * quota when starting OSTs), we might miss triggering reintegration
120 * If the previous reintegration failed for some reason, we'll
121 * re-trigger it here as well. */
122 if (!qqi->qqi_glb_uptodate || !qqi->qqi_slv_uptodate) {
123 cfs_read_unlock(&qsd->qsd_lock);
124 LQUOTA_DEBUG(lqe, "not up-to-date, dropping request and "
125 "kicking off reintegration");
126 qsd_start_reint_thread(qqi);
127 RETURN(-EINPROGRESS);
130 /* Fill the remote global lock handle, master will check this handle
131 * to see if the slave is sending request with stale lock */
132 lustre_handle_copy(lockh, &qqi->qqi_lockh);
133 cfs_read_unlock(&qsd->qsd_lock);
135 if (!lustre_handle_is_used(lockh))
138 lock = ldlm_handle2lock(lockh);
142 /* return remote lock handle to be packed in quota request */
143 lustre_handle_copy(lockh, &lock->l_remote_handle);
150 * Check whether any quota space adjustment (pre-acquire/release/report) is
151 * needed for a given quota ID. If a non-null \a qbody is passed, then the
152 * \a qbody structure (qb_count/flags/usage) is filled with appropriate data
153 * to be packed in the quota request.
155 * \param lqe - is the lquota entry for which we would like to adjust quota
157 * \param qbody - is the quota body to fill, if not NULL.
159 * \retval true - space adjustment is required and \a qbody is filled, if not
161 * \retval false - no space adjustment required
163 static bool qsd_calc_adjust(struct lquota_entry *lqe, struct quota_body *qbody)
165 __u64 usage, granted;
168 usage = lqe->lqe_usage;
169 usage += lqe->lqe_pending_write + lqe->lqe_waiting_write;
170 granted = lqe->lqe_granted;
175 if (!lqe->lqe_enforced) {
176 /* quota not enforced any more for this ID */
178 /* release all quota space unconditionally */
179 LQUOTA_DEBUG(lqe, "not enforced, releasing all space");
181 qbody->qb_count = granted;
182 qbody->qb_flags = QUOTA_DQACQ_FL_REL;
189 if (!lustre_handle_is_used(&lqe->lqe_lockh)) {
190 /* No valid per-ID lock
191 * When reporting quota (during reintegration or on setquota
192 * glimpse), we should release granted space if usage is 0.
193 * Otherwise, if the usage is less than granted, we need to
194 * acquire the per-ID lock to make sure the unused grant can be
195 * reclaimed by per-ID lock glimpse. */
197 /* no on-disk usage and no outstanding activity, release
200 LQUOTA_DEBUG(lqe, "no usage, releasing all "
203 qbody->qb_count = granted;
204 qbody->qb_flags = QUOTA_DQACQ_FL_REL;
208 LQUOTA_DEBUG(lqe, "no usage + no granted, nothing to "
213 if (lqe->lqe_usage < lqe->lqe_granted) {
214 /* holding quota space w/o any lock, enqueue per-ID lock
216 LQUOTA_DEBUG(lqe, "(re)acquiring per-ID lock");
219 qbody->qb_flags = QUOTA_DQACQ_FL_ACQ;
224 if (lqe->lqe_usage > lqe->lqe_granted) {
225 /* quota overrun, report usage */
226 LQUOTA_DEBUG(lqe, "overrun, reporting usage");
228 qbody->qb_usage = lqe->lqe_usage;
229 qbody->qb_flags = QUOTA_DQACQ_FL_REPORT;
233 LQUOTA_DEBUG(lqe, "granted matches usage, nothing to do");
238 * Apply good old quota qunit adjustment logic which has been around
240 * 1. release spare quota space? */
241 if (granted > usage + lqe->lqe_qunit) {
242 /* pre-release quota space */
245 qbody->qb_count = granted - usage;
246 /* if usage == 0, release all granted space */
248 /* try to keep one qunit of quota space */
249 qbody->qb_count -= lqe->lqe_qunit;
250 /* but don't release less than qtune to avoid releasing
252 if (qbody->qb_count < lqe->lqe_qtune)
253 qbody->qb_count = lqe->lqe_qtune;
255 qbody->qb_flags = QUOTA_DQACQ_FL_REL;
259 /* 2. Any quota overrun? */
260 if (lqe->lqe_usage > lqe->lqe_granted) {
261 /* we overconsumed quota space, we report usage in request so
262 * that master can adjust it unconditionally */
265 qbody->qb_usage = lqe->lqe_usage;
266 granted = lqe->lqe_usage;
267 qbody->qb_flags = QUOTA_DQACQ_FL_REPORT;
270 /* 3. Time to pre-acquire? */
271 if (!lqe->lqe_edquot && !lqe->lqe_nopreacq && usage > 0 &&
272 lqe->lqe_qunit != 0 && granted < usage + lqe->lqe_qtune) {
273 /* To pre-acquire quota space, we report how much spare quota
274 * space the slave currently owns, then the master will grant us
275 * back how much we can pretend given the current state of
279 if (granted <= usage)
282 qbody->qb_count = granted - usage;
283 qbody->qb_flags |= QUOTA_DQACQ_FL_PREACQ;
288 RETURN(qbody->qb_flags != 0);
294 * Helper function returning true when quota space need to be adjusted (some
295 * unused space should be free or pre-acquire) and false otherwise.
297 static inline bool qsd_adjust_needed(struct lquota_entry *lqe)
299 return qsd_calc_adjust(lqe, NULL);
303 * Callback function called when an acquire/release request sent to the master
306 static void qsd_req_completion(const struct lu_env *env,
307 struct qsd_qtype_info *qqi,
308 struct quota_body *reqbody,
309 struct quota_body *repbody,
310 struct lustre_handle *lockh,
311 union ldlm_wire_lvb *lvb,
314 struct lquota_entry *lqe = (struct lquota_entry *)arg;
315 struct qsd_thread_info *qti;
317 bool adjust = false, cancel = false;
320 LASSERT(qqi != NULL && lqe != NULL);
322 /* environment passed by ptlrpcd is mostly used by CLIO and hasn't the
324 rc = lu_env_refill_by_tags((struct lu_env *)env, LCT_DT_THREAD, 0);
326 LQUOTA_ERROR(lqe, "failed to refill environmnent %d", rc);
328 /* can't afford to adjust quota space with no suitable lu_env */
329 GOTO(out_noadjust, rc);
334 LQUOTA_DEBUG(lqe, "DQACQ returned %d, flags:0x%x", ret,
337 /* despite -EDQUOT & -EINPROGRESS errors, the master might still
338 * grant us back quota space to adjust quota overrun */
339 if (ret != 0 && ret != -EDQUOT && ret != -EINPROGRESS) {
340 if (ret != -ETIMEDOUT && ret != -ENOTCONN &&
341 ret != -ESHUTDOWN && ret != -EAGAIN)
342 /* print errors only if return code is unexpected */
343 LQUOTA_ERROR(lqe, "DQACQ failed with %d, flags:0x%x",
344 ret, reqbody->qb_flags);
348 /* Set the lqe_lockh */
349 if (lustre_handle_is_used(lockh) &&
350 !lustre_handle_equal(lockh, &lqe->lqe_lockh))
351 lustre_handle_copy(&lqe->lqe_lockh, lockh);
353 /* If the replied qb_count is zero, it means master didn't process
354 * the DQACQ since the limit for this ID has been removed, so we
355 * should not update quota entry & slave index copy neither. */
356 if (repbody != NULL && repbody->qb_count != 0) {
357 LQUOTA_DEBUG(lqe, "DQACQ qb_count:"LPU64, repbody->qb_count);
359 if (req_is_rel(reqbody->qb_flags)) {
360 if (lqe->lqe_granted < repbody->qb_count) {
361 LQUOTA_ERROR(lqe, "can't release more space "
362 "than owned "LPU64"<"LPU64,
365 lqe->lqe_granted = 0;
367 lqe->lqe_granted -= repbody->qb_count;
369 /* Cancel the per-ID lock initiatively when there
370 * isn't any usage & grant, which can avoid master
371 * sending glimpse unnecessarily to this slave on
373 if (!lqe->lqe_pending_write && !lqe->lqe_granted &&
374 !lqe->lqe_waiting_write && !lqe->lqe_usage)
377 lqe->lqe_granted += repbody->qb_count;
379 qti->qti_rec.lqr_slv_rec.qsr_granted = lqe->lqe_granted;
380 lqe_write_unlock(lqe);
382 /* Update the slave index file in the dedicated thread. So far,
383 * We don't update the version of slave index copy on DQACQ.
384 * No locking is necessary since nobody can change
385 * lqe->lqe_granted while lqe->lqe_pending_req > 0 */
386 qsd_upd_schedule(qqi, lqe, &lqe->lqe_id, &qti->qti_rec, 0,
391 /* extract information from lvb */
392 if (ret == 0 && lvb != 0) {
393 if (lvb->l_lquota.lvb_id_qunit != 0)
394 qsd_set_qunit(lqe, lvb->l_lquota.lvb_id_qunit);
395 if (lvb->l_lquota.lvb_flags & LQUOTA_FL_EDQUOT)
396 lqe->lqe_edquot = true;
398 lqe->lqe_edquot = false;
399 } else if (repbody != NULL && repbody->qb_qunit != 0) {
400 qsd_set_qunit(lqe, repbody->qb_qunit);
403 /* turn off pre-acquire if it failed with -EDQUOT. This is done to avoid
404 * flooding the master with acquire request. Pre-acquire will be turned
405 * on again as soon as qunit is modified */
406 if (req_is_preacq(reqbody->qb_flags) && ret == -EDQUOT)
407 lqe->lqe_nopreacq = true;
409 adjust = qsd_adjust_needed(lqe);
411 qsd_request_exit(lqe);
412 lqe_write_unlock(lqe);
414 /* release reference on per-ID lock */
415 if (lustre_handle_is_used(lockh))
416 ldlm_lock_decref(lockh, qsd_id_einfo.ei_mode);
419 qsd_adjust_schedule(lqe, false, true);
421 if (!ret || ret == -EDQUOT)
422 qsd_adjust_schedule(lqe, false, false);
424 qsd_adjust_schedule(lqe, true, false);
434 * Try to consume local quota space.
436 * \param lqe - is the qid entry to be processed
437 * \param space - is the amount of quota space needed to complete the operation
439 * \retval 0 - success
440 * \retval -EDQUOT - out of quota
441 * \retval -EAGAIN - need to acquire space from master
443 static int qsd_acquire_local(struct lquota_entry *lqe, __u64 space)
449 if (!lqe->lqe_enforced)
450 /* not enforced any more, we are good */
454 /* use latest usage */
455 usage = lqe->lqe_usage;
456 /* take pending write into account */
457 usage += lqe->lqe_pending_write;
459 if (space + usage <= lqe->lqe_granted - lqe->lqe_pending_rel) {
460 /* Yay! we got enough space */
461 lqe->lqe_pending_write += space;
462 lqe->lqe_waiting_write -= space;
464 } else if (lqe->lqe_edquot) {
469 lqe_write_unlock(lqe);
475 * Compute how much quota space should be acquire from the master based
476 * on how much is currently granted to this slave and pending/waiting
479 * \param lqe - is the lquota entry for which we would like to adjust quota
481 * \param qbody - is the quota body of the acquire request to fill
483 * \retval true - space acquisition is needed and qbody is filled
484 * \retval false - no space acquisition required
486 static inline bool qsd_calc_acquire(struct lquota_entry *lqe,
487 struct quota_body *qbody)
489 __u64 usage, granted;
491 usage = lqe->lqe_usage;
492 usage += lqe->lqe_pending_write + lqe->lqe_waiting_write;
493 granted = lqe->lqe_granted;
497 /* if we overconsumed quota space, we report usage in request so that
498 * master can adjust it unconditionally */
499 if (lqe->lqe_usage > lqe->lqe_granted) {
500 qbody->qb_usage = lqe->lqe_usage;
501 qbody->qb_flags = QUOTA_DQACQ_FL_REPORT;
502 granted = lqe->lqe_usage;
505 /* acquire as much as needed, but not more */
506 if (usage > granted) {
507 qbody->qb_count = usage - granted;
508 qbody->qb_flags |= QUOTA_DQACQ_FL_ACQ;
511 return qbody->qb_flags != 0;
515 * Acquire quota space from master.
516 * There are at most 1 in-flight dqacq/dqrel.
518 * \param env - the environment passed by the caller
519 * \param lqe - is the qid entry to be processed
521 * \retval 0 - success
522 * \retval -EDQUOT - out of quota
523 * \retval -EINPROGRESS - inform client to retry write/create
524 * \retval -EBUSY - already a quota request in flight
525 * \retval -ve - other appropriate errors
527 static int qsd_acquire_remote(const struct lu_env *env,
528 struct lquota_entry *lqe)
530 struct qsd_thread_info *qti = qsd_info(env);
531 struct quota_body *qbody = &qti->qti_body;
532 struct qsd_instance *qsd;
533 struct qsd_qtype_info *qqi;
537 memset(qbody, 0, sizeof(*qbody));
538 rc = qsd_ready(lqe, &qbody->qb_glb_lockh);
547 /* is quota really enforced for this id? */
548 if (!lqe->lqe_enforced) {
549 lqe_write_unlock(lqe);
550 LQUOTA_DEBUG(lqe, "quota not enforced any more");
554 /* fill qb_count & qb_flags */
555 if (!qsd_calc_acquire(lqe, qbody)) {
556 lqe_write_unlock(lqe);
557 LQUOTA_DEBUG(lqe, "No acquire required");
561 /* only 1 quota request in flight for a given ID is allowed */
562 rc = qsd_request_enter(lqe);
564 lqe_write_unlock(lqe);
568 lustre_handle_copy(&qti->qti_lockh, &lqe->lqe_lockh);
569 lqe_write_unlock(lqe);
571 /* hold a refcount until completion */
574 /* fill other quota body fields */
575 qbody->qb_fid = qqi->qqi_fid;
576 qbody->qb_id = lqe->lqe_id;
578 /* check whether we already own a valid lock for this ID */
579 rc = qsd_id_lock_match(&qti->qti_lockh, &qbody->qb_lockh);
581 union ldlm_wire_lvb *lvb;
586 qsd_req_completion(env, qqi, qbody, NULL,
587 &qti->qti_lockh, NULL, lqe, rc);
590 /* no lock found, should use intent */
591 rc = qsd_intent_lock(env, qsd->qsd_exp, qbody, true,
592 IT_QUOTA_DQACQ, qsd_req_completion,
593 qqi, lvb, (void *)lqe);
595 /* lock found, should use regular dqacq */
596 rc = qsd_send_dqacq(env, qsd->qsd_exp, qbody, true,
597 qsd_req_completion, qqi, &qti->qti_lockh,
601 /* the completion function will be called by qsd_send_dqacq or
607 * Acquire \a space of quota space in order to complete an operation.
608 * Try to consume local quota space first and send acquire request to quota
609 * master if required.
611 * \param env - the environment passed by the caller
612 * \param lqe - is the qid entry to be processed
613 * \param space - is the amount of quota required for the operation
614 * \param ret - is the return code (-EDQUOT, -EINPROGRESS, ...)
616 * \retval true - exit from l_wait_event and real return value in \a ret
617 * \retval false - continue waiting
619 static bool qsd_acquire(const struct lu_env *env, struct lquota_entry *lqe,
620 long long space, int *ret)
625 for (count = 0; rc == 0; count++) {
626 LQUOTA_DEBUG(lqe, "acquiring:"LPD64 " count=%d", space, count);
628 if (lqe2qqi(lqe)->qqi_qsd->qsd_stopping) {
633 /* refresh disk usage */
634 rc = qsd_refresh_usage(env, lqe);
638 /* try to consume local quota space first */
639 rc = qsd_acquire_local(lqe, space);
641 /* rc == 0, Wouhou! enough local quota space
642 * rc < 0, something bad happened */
645 /* need to acquire more quota space from master */
646 rc = qsd_acquire_remote(env, lqe);
650 /* already a request in flight, continue waiting */
653 RETURN(true); /* exit from l_wait_event */
657 * Quota enforcement handler. If local quota can satisfy this operation,
658 * return success, otherwise, acquire more quota from master.
659 * (for write operation, if master isn't available at this moment, return
660 * -EINPROGRESS to inform client to retry the write)
662 * \param env - the environment passed by the caller
663 * \param qsd - is the qsd instance associated with the device in charge
665 * \param qid - is the qid information attached in the transaction handle
666 * \param space - is the space required by the operation
667 * \param flags - if the operation is write, return caller no user/group
668 * and sync commit flags
670 * \retval 0 - success
671 * \retval -EDQUOT - out of quota
672 * \retval -EINPROGRESS - inform client to retry write
673 * \retval -ve - other appropriate errors
675 static int qsd_op_begin0(const struct lu_env *env, struct qsd_qtype_info *qqi,
676 struct lquota_id_info *qid, long long space,
679 struct lquota_entry *lqe;
680 int rc, ret = -EINPROGRESS;
681 struct l_wait_info lwi;
684 if (qid->lqi_qentry != NULL) {
685 /* we already had to deal with this id for this transaction */
686 lqe = qid->lqi_qentry;
687 if (!lqe->lqe_enforced)
690 /* look up lquota entry associated with qid */
691 lqe = lqe_locate(env, qqi->qqi_site, &qid->lqi_id);
693 RETURN(PTR_ERR(lqe));
694 if (!lqe->lqe_enforced) {
698 qid->lqi_qentry = lqe;
699 /* lqe will be released in qsd_op_end() */
703 /* when space is negative or null, we don't need to consume
704 * quota space. That said, we still want to perform space
705 * adjustments in qsd_op_end, so we return here, but with
706 * a reference on the lqe */
708 rc = qsd_refresh_usage(env, lqe);
714 LQUOTA_DEBUG(lqe, "op_begin space:"LPD64, space);
717 lqe->lqe_waiting_write += space;
718 lqe_write_unlock(lqe);
720 /* acquire quota space for the operation, cap overall wait time to
721 * prevent a service thread from being stuck for too long */
722 lwi = LWI_TIMEOUT(cfs_time_seconds(qsd_wait_timeout(qqi->qqi_qsd)),
724 rc = l_wait_event(lqe->lqe_waiters, qsd_acquire(env, lqe, space, &ret),
727 if (rc == 0 && ret == 0) {
728 qid->lqi_space += space;
733 LQUOTA_DEBUG(lqe, "acquire quota failed:%d", rc);
736 lqe->lqe_waiting_write -= space;
738 if (flags && lqe->lqe_pending_write != 0)
739 /* Inform OSD layer that there are pending writes.
740 * It might want to retry after a sync if appropriate */
741 *flags |= QUOTA_FL_SYNC;
742 lqe_write_unlock(lqe);
744 /* convert recoverable error into -EINPROGRESS, client will
746 if (rc == -ETIMEDOUT || rc == -ENOTCONN || rc == -ENOLCK ||
747 rc == -EAGAIN || rc == -EINTR) {
749 } else if (rc == -ESRCH) {
751 LQUOTA_ERROR(lqe, "ID isn't enforced on master, it "
752 "probably due to a legeal race, if this "
753 "message is showing up constantly, there "
754 "could be some inconsistence between "
755 "master & slave, and quota reintegration "
756 "needs be re-triggered.");
762 LASSERT(qid->lqi_is_blk);
764 *flags |= LQUOTA_OVER_FL(qqi->qqi_qtype);
769 usage = lqe->lqe_usage;
770 usage += lqe->lqe_pending_write;
771 usage += lqe->lqe_waiting_write;
772 usage += qqi->qqi_qsd->qsd_sync_threshold;
774 /* if we should notify client to start sync write */
775 if (usage >= lqe->lqe_granted - lqe->lqe_pending_rel)
776 *flags |= LQUOTA_OVER_FL(qqi->qqi_qtype);
778 *flags &= ~LQUOTA_OVER_FL(qqi->qqi_qtype);
779 lqe_read_unlock(lqe);
786 * helper function comparing two lquota_id_info structures
788 static inline bool qid_equal(struct lquota_id_info *q1,
789 struct lquota_id_info *q2)
791 if (q1->lqi_type != q2->lqi_type)
793 return (q1->lqi_id.qid_uid == q2->lqi_id.qid_uid) ? true : false;
797 * Enforce quota, it's called in the declaration of each operation.
798 * qsd_op_end() will then be called later once all the operations have been
799 * completed in order to release/adjust the quota space.
801 * \param env - the environment passed by the caller
802 * \param qsd - is the qsd instance associated with the device in charge of
804 * \param trans - is the quota transaction information
805 * \param qi - qid & space required by current operation
806 * \param flags - if the operation is write, return caller no user/group and
809 * \retval 0 - success
810 * \retval -EDQUOT - out of quota
811 * \retval -EINPROGRESS - inform client to retry write
812 * \retval -ve - other appropriate errors
814 int qsd_op_begin(const struct lu_env *env, struct qsd_instance *qsd,
815 struct lquota_trans *trans, struct lquota_id_info *qi,
822 if (unlikely(qsd == NULL))
825 /* We don't enforce quota until the qsd_instance is started */
826 cfs_read_lock(&qsd->qsd_lock);
827 if (!qsd->qsd_started) {
828 cfs_read_unlock(&qsd->qsd_lock);
831 cfs_read_unlock(&qsd->qsd_lock);
833 /* ignore block quota on MDTs, ignore inode quota on OSTs */
834 if ((!qsd->qsd_is_md && !qi->lqi_is_blk) ||
835 (qsd->qsd_is_md && qi->lqi_is_blk))
838 /* ignore quota enforcement request when:
839 * - quota isn't enforced for this quota type
840 * or - the user/group is root */
841 if (!qsd_type_enabled(qsd, qi->lqi_type) || qi->lqi_id.qid_uid == 0)
844 LASSERTF(trans->lqt_id_cnt <= QUOTA_MAX_TRANSIDS, "id_cnt=%d",
846 /* check whether we already allocated a slot for this id */
847 for (i = 0; i < trans->lqt_id_cnt; i++) {
848 if (qid_equal(qi, &trans->lqt_ids[i])) {
850 /* make sure we are not mixing inodes & blocks */
851 LASSERT(trans->lqt_ids[i].lqi_is_blk == qi->lqi_is_blk);
857 if (unlikely(i >= QUOTA_MAX_TRANSIDS)) {
858 CERROR("%s: more than %d qids enforced for a "
859 "transaction?\n", qsd->qsd_svname, i);
864 trans->lqt_ids[i].lqi_id = qi->lqi_id;
865 trans->lqt_ids[i].lqi_type = qi->lqi_type;
866 trans->lqt_ids[i].lqi_is_blk = qi->lqi_is_blk;
870 /* manage quota enforcement for this ID */
871 rc = qsd_op_begin0(env, qsd->qsd_type_array[qi->lqi_type],
872 &trans->lqt_ids[i], qi->lqi_space, flags);
875 EXPORT_SYMBOL(qsd_op_begin);
878 * Adjust quota space (by acquiring or releasing) hold by the quota slave.
879 * This function is called after each quota request completion and during
880 * reintegration in order to report usage or re-acquire quota locks.
881 * Space adjustment is aborted if there is already a quota request in flight
884 * \param env - the environment passed by the caller
885 * \param lqe - is the qid entry to be processed
887 * \retval 0 on success, appropriate errors on failure
889 int qsd_adjust(const struct lu_env *env, struct lquota_entry *lqe)
891 struct qsd_thread_info *qti = qsd_info(env);
892 struct quota_body *qbody = &qti->qti_body;
893 struct qsd_instance *qsd;
894 struct qsd_qtype_info *qqi;
899 memset(qbody, 0, sizeof(*qbody));
900 rc = qsd_ready(lqe, &qbody->qb_glb_lockh);
909 /* fill qb_count & qb_flags */
910 if (!qsd_calc_adjust(lqe, qbody)) {
911 lqe_write_unlock(lqe);
912 LQUOTA_DEBUG(lqe, "No adjust required");
916 /* only 1 quota request in flight for a given ID is allowed */
917 rc = qsd_request_enter(lqe);
919 /* already a request in flight, space adjustment will be run
920 * again on request completion */
921 lqe_write_unlock(lqe);
925 if (req_is_rel(qbody->qb_flags))
926 lqe->lqe_pending_rel = qbody->qb_count;
927 lustre_handle_copy(&qti->qti_lockh, &lqe->lqe_lockh);
928 lqe_write_unlock(lqe);
930 /* hold a refcount until completion */
933 /* fill other quota body fields */
934 qbody->qb_fid = qqi->qqi_fid;
935 qbody->qb_id = lqe->lqe_id;
937 if (req_is_acq(qbody->qb_flags) || req_is_preacq(qbody->qb_flags)) {
938 /* check whether we own a valid lock for this ID */
939 rc = qsd_id_lock_match(&qti->qti_lockh, &qbody->qb_lockh);
941 memset(&qti->qti_lockh, 0, sizeof(qti->qti_lockh));
942 if (req_is_preacq(qbody->qb_flags)) {
943 if (req_has_rep(qbody->qb_flags))
944 /* still want to report usage */
945 qbody->qb_flags = QUOTA_DQACQ_FL_REPORT;
947 /* no pre-acquire if no per-ID lock */
948 GOTO(out, rc = -ENOLCK);
950 /* no lock found, should use intent */
953 } else if (req_is_acq(qbody->qb_flags) &&
954 qbody->qb_count == 0) {
955 /* found cached lock, no need to acquire */
959 /* release and report don't need a per-ID lock */
960 memset(&qti->qti_lockh, 0, sizeof(qti->qti_lockh));
964 rc = qsd_send_dqacq(env, qsd->qsd_exp, qbody, false,
965 qsd_req_completion, qqi, &qti->qti_lockh,
968 union ldlm_wire_lvb *lvb;
972 GOTO(out, rc = -ENOMEM);
974 rc = qsd_intent_lock(env, qsd->qsd_exp, qbody, false,
975 IT_QUOTA_DQACQ, qsd_req_completion,
976 qqi, lvb, (void *)lqe);
978 /* the completion function will be called by qsd_send_dqacq or
982 qsd_req_completion(env, qqi, qbody, NULL, &qti->qti_lockh, NULL, lqe,
988 * Post quota operation, pre-acquire/release quota from master.
990 * \param env - the environment passed by the caller
991 * \param qsd - is the qsd instance attached to the OSD device which
992 * is handling the operation.
993 * \param qqi - is the qsd_qtype_info structure associated with the quota ID
994 * subject to the operation
995 * \param qid - stores information related to his ID for the operation
996 * which has just completed
998 * \retval 0 - success
999 * \retval -ve - failure
1001 static void qsd_op_end0(const struct lu_env *env, struct qsd_qtype_info *qqi,
1002 struct lquota_id_info *qid)
1004 struct lquota_entry *lqe;
1008 lqe = qid->lqi_qentry;
1011 qid->lqi_qentry = NULL;
1013 /* refresh cached usage if a suitable environment is passed */
1015 qsd_refresh_usage(env, lqe);
1017 lqe_write_lock(lqe);
1018 if (qid->lqi_space > 0)
1019 lqe->lqe_pending_write -= qid->lqi_space;
1021 adjust = qsd_adjust_needed(lqe);
1024 lqe_write_unlock(lqe);
1027 /* pre-acquire/release quota space is needed */
1029 qsd_adjust(env, lqe);
1031 /* no suitable environment, handle adjustment in
1032 * separate thread context */
1033 qsd_adjust_schedule(lqe, false, false);
1040 * Post quota operation. It's called after each operation transaction stopped.
1042 * \param env - the environment passed by the caller
1043 * \param qsd - is the qsd instance associated with device which is handling
1045 * \param qids - all qids information attached in the transaction handle
1046 * \param count - is the number of qid entries in the qids array.
1048 * \retval 0 - success
1049 * \retval -ve - failure
1051 void qsd_op_end(const struct lu_env *env, struct qsd_instance *qsd,
1052 struct lquota_trans *trans)
1057 if (unlikely(qsd == NULL))
1060 /* We don't enforce quota until the qsd_instance is started */
1061 cfs_read_lock(&qsd->qsd_lock);
1062 if (!qsd->qsd_started) {
1063 cfs_read_unlock(&qsd->qsd_lock);
1066 cfs_read_unlock(&qsd->qsd_lock);
1068 LASSERT(trans != NULL);
1070 for (i = 0; i < trans->lqt_id_cnt; i++) {
1071 struct qsd_qtype_info *qqi;
1073 if (trans->lqt_ids[i].lqi_qentry == NULL)
1076 qqi = qsd->qsd_type_array[trans->lqt_ids[i].lqi_type];
1077 qsd_op_end0(env, qqi, &trans->lqt_ids[i]);
1080 /* reset id_count to 0 so that a second accidental call to qsd_op_end()
1081 * does not result in failure */
1082 trans->lqt_id_cnt = 0;
1085 EXPORT_SYMBOL(qsd_op_end);
1088 * Trigger pre-acquire/release if necessary.
1089 * It's only used by ldiskfs osd so far. When unlink a file in ldiskfs, the
1090 * quota accounting isn't updated when the transaction stopped. Instead, it'll
1091 * be updated on the final iput, so qsd_op_adjust() will be called then (in
1092 * osd_object_delete()) to trigger quota release if necessary.
1094 * \param env - the environment passed by the caller
1095 * \param qsd - is the qsd instance associated with the device in charge
1097 * \param qid - is the lquota ID of the user/group for which to trigger
1098 * quota space adjustment
1099 * \param qtype - is the quota type (USRQUOTA or GRPQUOTA)
1101 void qsd_op_adjust(const struct lu_env *env, struct qsd_instance *qsd,
1102 union lquota_id *qid, int qtype)
1104 struct lquota_entry *lqe;
1105 struct qsd_qtype_info *qqi;
1109 if (unlikely(qsd == NULL))
1112 /* We don't enforce quota until the qsd_instance is started */
1113 cfs_read_lock(&qsd->qsd_lock);
1114 if (!qsd->qsd_started) {
1115 cfs_read_unlock(&qsd->qsd_lock);
1118 cfs_read_unlock(&qsd->qsd_lock);
1120 qqi = qsd->qsd_type_array[qtype];
1123 if (!qsd_type_enabled(qsd, qtype) || qqi->qqi_acct_obj == NULL ||
1127 cfs_read_lock(&qsd->qsd_lock);
1128 if (!qsd->qsd_started) {
1129 cfs_read_unlock(&qsd->qsd_lock);
1132 cfs_read_unlock(&qsd->qsd_lock);
1134 lqe = lqe_locate(env, qqi->qqi_site, qid);
1136 CERROR("%s: fail to locate lqe for id:"LPU64", type:%d\n",
1137 qsd->qsd_svname, qid->qid_uid, qtype);
1141 qsd_refresh_usage(env, lqe);
1144 adjust = qsd_adjust_needed(lqe);
1145 lqe_read_unlock(lqe);
1148 qsd_adjust(env, lqe);
1153 EXPORT_SYMBOL(qsd_op_adjust);