1 // SPDX-License-Identifier: GPL-2.0
4 * Copyright (c) 2012, 2017, Intel Corporation.
5 * Use is subject to license terms.
9 * Author: Johann Lombardi <johann.lombardi@intel.com>
10 * Author: Niu Yawei <yawei.niu@intel.com>
13 #define DEBUG_SUBSYSTEM S_LQUOTA
15 #include "qsd_internal.h"
18 * helper function bumping lqe_pending_req if there is no quota request in
19 * flight for the lquota entry \a lqe. Otherwise, EBUSY is returned.
21 static inline int qsd_request_enter(struct lquota_entry *lqe)
23 /* is there already a quota request in flight? */
24 if (lqe->lqe_pending_req != 0) {
25 LQUOTA_DEBUG(lqe, "already a request in flight");
29 if (lqe->lqe_pending_rel != 0) {
30 LQUOTA_ERROR(lqe, "no request in flight with pending_rel=%llu",
31 lqe->lqe_pending_rel);
35 lqe->lqe_pending_req++;
40 * Companion of qsd_request_enter() dropping lqe_pending_req to 0.
42 static inline void qsd_request_exit(struct lquota_entry *lqe)
44 if (lqe->lqe_pending_req != 1) {
45 LQUOTA_ERROR(lqe, "lqe_pending_req != 1!!!");
48 lqe->lqe_pending_req--;
49 lqe->lqe_pending_rel = 0;
50 wake_up(&lqe->lqe_waiters);
54 * Check whether a qsd instance is all set to send quota request to master.
55 * This includes checking whether:
56 * - the connection to master is set up and usable,
57 * - the qsd isn't stopping
58 * - reintegration has been successfully completed and all indexes are
61 * \param lqe - is the lquota entry for which we would like to send an quota
63 * \param lockh - is the remote handle of the global lock returned on success
65 * \retval 0 on success, appropriate error on failure
67 static int qsd_ready(struct lquota_entry *lqe, struct lustre_handle *lockh)
69 struct qsd_qtype_info *qqi = lqe2qqi(lqe);
70 struct qsd_instance *qsd = qqi->qqi_qsd;
71 struct obd_import *imp = NULL;
72 struct ldlm_lock *lock;
75 read_lock(&qsd->qsd_lock);
76 /* is the qsd about to shut down? */
77 if (qsd->qsd_stopping) {
78 read_unlock(&qsd->qsd_lock);
79 LQUOTA_DEBUG(lqe, "dropping quota req since qsd is stopping");
80 /* Target is about to shut down, client will retry */
84 /* is the connection to the quota master ready? */
85 if (qsd->qsd_exp_valid)
86 imp = class_exp2cliimp(qsd->qsd_exp);
87 if (imp == NULL || imp->imp_invalid) {
88 read_unlock(&qsd->qsd_lock);
89 LQUOTA_DEBUG(lqe, "connection to master not ready");
93 /* In most case, reintegration must have been triggered (when enable
94 * quota or on OST start), however, in rare race condition (enabling
95 * quota when starting OSTs), we might miss triggering reintegration
98 * If the previous reintegration failed for some reason, we'll
99 * re-trigger it here as well. */
100 if (!qqi->qqi_glb_uptodate || !qqi->qqi_slv_uptodate) {
101 read_unlock(&qsd->qsd_lock);
102 LQUOTA_DEBUG(lqe, "not up-to-date, dropping request and "
103 "kicking off reintegration");
104 qsd_start_reint_thread(qqi);
105 RETURN(-EINPROGRESS);
108 /* Fill the remote global lock handle, master will check this handle
109 * to see if the slave is sending request with stale lock */
110 lustre_handle_copy(lockh, &qqi->qqi_lockh);
111 read_unlock(&qsd->qsd_lock);
113 if (!lustre_handle_is_used(lockh))
116 lock = ldlm_handle2lock(lockh);
120 /* return remote lock handle to be packed in quota request */
121 lustre_handle_copy(lockh, &lock->l_remote_handle);
128 * Check whether any quota space adjustment (pre-acquire/release/report) is
129 * needed for a given quota ID. If a non-null \a qbody is passed, then the
130 * \a qbody structure (qb_count/flags/usage) is filled with appropriate data
131 * to be packed in the quota request.
133 * \param lqe - is the lquota entry for which we would like to adjust quota
135 * \param qbody - is the quota body to fill, if not NULL.
137 * \retval true - space adjustment is required and \a qbody is filled, if not
139 * \retval false - no space adjustment required
141 static bool qsd_calc_adjust(struct lquota_entry *lqe, struct quota_body *qbody)
143 __u64 usage, granted;
146 usage = lqe->lqe_usage;
147 usage += lqe->lqe_pending_write + lqe->lqe_waiting_write;
148 granted = lqe->lqe_granted;
153 if (!lqe->lqe_enforced) {
154 /* quota not enforced any more for this ID */
156 /* release all quota space unconditionally */
157 LQUOTA_DEBUG(lqe, "not enforced, releasing all space");
159 qbody->qb_count = granted;
160 qbody->qb_flags = QUOTA_DQACQ_FL_REL;
167 if (!lustre_handle_is_used(&lqe->lqe_lockh)) {
168 /* No valid per-ID lock
169 * When reporting quota (during reintegration or on setquota
170 * glimpse), we should release granted space if usage is 0.
171 * Otherwise, if the usage is less than granted, we need to
172 * acquire the per-ID lock to make sure the unused grant can be
173 * reclaimed by per-ID lock glimpse. */
175 /* no on-disk usage and no outstanding activity, release
178 LQUOTA_DEBUG(lqe, "no usage, releasing all "
181 qbody->qb_count = granted;
182 qbody->qb_flags = QUOTA_DQACQ_FL_REL;
186 LQUOTA_DEBUG(lqe, "no usage + no granted, nothing to "
191 if (lqe->lqe_usage < lqe->lqe_granted) {
192 /* holding quota space w/o any lock, enqueue per-ID lock
194 LQUOTA_DEBUG(lqe, "(re)acquiring per-ID lock");
197 qbody->qb_flags = QUOTA_DQACQ_FL_ACQ;
202 if (lqe->lqe_usage > lqe->lqe_granted) {
203 /* quota overrun, report usage */
204 LQUOTA_DEBUG(lqe, "overrun, reporting usage");
206 qbody->qb_usage = lqe->lqe_usage;
207 qbody->qb_flags = QUOTA_DQACQ_FL_REPORT;
211 LQUOTA_DEBUG(lqe, "granted matches usage, nothing to do");
216 * Apply good old quota qunit adjustment logic which has been around
218 * 1. revoke all extra grant
220 if (lqe->lqe_revoke) {
226 LQUOTA_DEBUG(lqe, "revoke pre-acquired quota: %llu - %llu\n",
228 qbody->qb_count = granted - usage;
229 qbody->qb_flags = QUOTA_DQACQ_FL_REL;
233 /* 2. release spare quota space? */
234 if (granted > usage + lqe->lqe_qunit) {
235 /* pre-release quota space */
238 qbody->qb_count = granted - usage;
239 /* if usage == 0, release all granted space */
241 /* try to keep one qunit of quota space */
242 qbody->qb_count -= lqe->lqe_qunit;
243 /* but don't release less than qtune to avoid releasing
245 if (qbody->qb_count < lqe->lqe_qtune)
246 qbody->qb_count = lqe->lqe_qtune;
248 qbody->qb_flags = QUOTA_DQACQ_FL_REL;
252 /* 3. Any quota overrun? */
253 if (lqe->lqe_usage > lqe->lqe_granted) {
254 /* we overconsumed quota space, we report usage in request so
255 * that master can adjust it unconditionally */
258 qbody->qb_usage = lqe->lqe_usage;
259 granted = lqe->lqe_usage;
260 qbody->qb_flags = QUOTA_DQACQ_FL_REPORT;
263 /* 4. Time to pre-acquire? */
264 if (!lqe->lqe_edquot && !lqe->lqe_nopreacq && usage > 0 &&
265 lqe->lqe_qunit != 0 && granted < usage + lqe->lqe_qtune) {
266 /* To pre-acquire quota space, we report how much spare quota
267 * space the slave currently owns, then the master will grant us
268 * back how much we can pretend given the current state of
272 if (granted <= usage)
275 qbody->qb_count = granted - usage;
276 qbody->qb_flags |= QUOTA_DQACQ_FL_PREACQ;
281 RETURN(qbody->qb_flags != 0);
287 * Helper function returning true when quota space need to be adjusted (some
288 * unused space should be free or pre-acquire) and false otherwise.
290 static inline bool qsd_adjust_needed(struct lquota_entry *lqe)
292 return qsd_calc_adjust(lqe, NULL);
296 * Callback function called when an acquire/release request sent to the master
299 static void qsd_req_completion(const struct lu_env *env,
300 struct qsd_qtype_info *qqi,
301 struct quota_body *reqbody,
302 struct quota_body *repbody,
303 struct lustre_handle *lockh,
304 struct lquota_lvb *lvb,
307 struct lquota_entry *lqe = (struct lquota_entry *)arg;
308 struct qsd_thread_info *qti;
310 bool adjust = false, cancel = false;
313 LASSERT(qqi != NULL && lqe != NULL);
315 /* environment passed by ptlrpcd is mostly used by CLIO and hasn't the
317 rc = lu_env_refill_by_tags((struct lu_env *)env, LCT_DT_THREAD, 0);
319 LQUOTA_ERROR(lqe, "failed to refill environmnent %d", rc);
321 /* can't afford to adjust quota space with no suitable lu_env */
322 GOTO(out_noadjust, rc);
327 LQUOTA_DEBUG(lqe, "DQACQ returned %d, flags:0x%x", ret,
330 /* despite -EDQUOT & -EINPROGRESS errors, the master might still
331 * grant us back quota space to adjust quota overrun */
332 if (ret != 0 && ret != -EDQUOT && ret != -EINPROGRESS) {
333 if (ret != -ETIMEDOUT && ret != -ENOTCONN &&
334 ret != -ESHUTDOWN && ret != -EAGAIN)
335 /* print errors only if return code is unexpected */
336 LQUOTA_ERROR(lqe, "DQACQ failed with %d, flags:0x%x",
337 ret, reqbody->qb_flags);
341 /* Set the lqe_lockh */
342 if (lustre_handle_is_used(lockh) &&
343 !lustre_handle_equal(lockh, &lqe->lqe_lockh))
344 lustre_handle_copy(&lqe->lqe_lockh, lockh);
346 /* If the replied qb_count is zero, it means master didn't process
347 * the DQACQ since the limit for this ID has been removed, so we
348 * should not update quota entry & slave index copy neither. */
349 if (repbody != NULL && repbody->qb_count != 0) {
350 LQUOTA_DEBUG(lqe, "DQACQ qb_count:%llu", repbody->qb_count);
352 if (lqe->lqe_is_reset) {
353 lqe->lqe_granted = 0;
354 } else if (req_is_rel(reqbody->qb_flags)) {
355 if (lqe->lqe_granted < repbody->qb_count) {
356 LQUOTA_ERROR(lqe, "can't release more space "
357 "than owned %llu<%llu",
360 lqe->lqe_granted = 0;
362 lqe->lqe_granted -= repbody->qb_count;
364 /* Cancel the per-ID lock initiatively when there
365 * isn't any usage & grant, which can avoid master
366 * sending glimpse unnecessarily to this slave on
368 if (!lqe->lqe_pending_write && !lqe->lqe_granted &&
369 !lqe->lqe_waiting_write && !lqe->lqe_usage)
372 lqe->lqe_granted += repbody->qb_count;
374 qti->qti_rec.lqr_slv_rec.qsr_granted = lqe->lqe_granted;
375 lqe_write_unlock(lqe);
377 /* Update the slave index file in the dedicated thread. So far,
378 * We don't update the version of slave index copy on DQACQ.
379 * No locking is necessary since nobody can change
380 * lqe->lqe_granted while lqe->lqe_pending_req > 0 */
381 if (CFS_FAIL_CHECK(OBD_FAIL_QUOTA_GRANT))
382 qti->qti_rec.lqr_slv_rec.qsr_granted =
383 0xFFFFFFFFFFDEC80CULL;
384 qsd_upd_schedule(qqi, lqe, &lqe->lqe_id, &qti->qti_rec, 0,
389 /* extract information from lvb */
390 if (ret == 0 && lvb != NULL) {
391 if (lvb->lvb_id_qunit != 0)
392 qsd_set_qunit(lqe, lvb->lvb_id_qunit);
393 qsd_set_edquot(lqe, !!(lvb->lvb_flags & LQUOTA_FL_EDQUOT));
394 } else if (repbody != NULL && repbody->qb_qunit != 0) {
395 qsd_set_qunit(lqe, repbody->qb_qunit);
398 /* turn off pre-acquire if it failed with -EDQUOT. This is done to avoid
399 * flooding the master with acquire request. Pre-acquire will be turned
400 * on again as soon as qunit is modified */
401 if (req_is_preacq(reqbody->qb_flags) && ret == -EDQUOT)
402 lqe->lqe_nopreacq = true;
404 adjust = qsd_adjust_needed(lqe);
405 if (reqbody && req_is_acq(reqbody->qb_flags) && ret != -EDQUOT) {
406 lqe->lqe_acq_rc = ret;
407 lqe->lqe_acq_time = ktime_get_seconds();
410 qsd_request_exit(lqe);
411 lqe_write_unlock(lqe);
413 /* release reference on per-ID lock */
414 if (lustre_handle_is_used(lockh))
415 ldlm_lock_decref(lockh, qsd_id_einfo.ei_mode);
418 qsd_adjust_schedule(lqe, false, true);
420 if (!ret || ret == -EDQUOT)
421 qsd_adjust_schedule(lqe, false, false);
423 qsd_adjust_schedule(lqe, true, false);
432 * Try to consume local quota space.
434 * \param lqe - is the qid entry to be processed
435 * \param space - is the amount of quota space needed to complete the operation
437 * \retval 0 - success
438 * \retval -EDQUOT - out of quota
439 * \retval -EAGAIN - need to acquire space from master
441 static int qsd_acquire_local(struct lquota_entry *lqe, __u64 space)
447 if (!lqe->lqe_enforced)
448 /* not enforced any more, we are good */
452 /* use latest usage */
453 usage = lqe->lqe_usage;
454 /* take pending write into account */
455 usage += lqe->lqe_pending_write;
457 if (space + usage <= lqe->lqe_granted - lqe->lqe_pending_rel) {
458 /* Yay! we got enough space */
459 lqe->lqe_pending_write += space;
460 lqe->lqe_waiting_write -= space;
462 /* lqe_edquot flag is used to avoid flooding dqacq requests when
463 * the user is over quota, however, the lqe_edquot could be stale
464 * sometimes due to the race reply of dqacq vs. id lock glimpse
465 * (see LU-4505), so we revalidate it every 5 seconds. */
466 } else if (lqe->lqe_edquot &&
467 (lqe->lqe_edquot_time > ktime_get_seconds() - 5)) {
472 lqe_write_unlock(lqe);
478 * Compute how much quota space should be acquire from the master based
479 * on how much is currently granted to this slave and pending/waiting
482 * \param lqe - is the lquota entry for which we would like to adjust quota
484 * \param qbody - is the quota body of the acquire request to fill
486 * \retval true - space acquisition is needed and qbody is filled
487 * \retval false - no space acquisition required
489 static inline bool qsd_calc_acquire(struct lquota_entry *lqe,
490 struct quota_body *qbody)
492 __u64 usage, granted;
494 usage = lqe->lqe_usage;
495 usage += lqe->lqe_pending_write + lqe->lqe_waiting_write;
496 granted = lqe->lqe_granted;
500 /* if we overconsumed quota space, we report usage in request so that
501 * master can adjust it unconditionally */
502 if (lqe->lqe_usage > lqe->lqe_granted) {
503 qbody->qb_usage = lqe->lqe_usage;
504 qbody->qb_flags = QUOTA_DQACQ_FL_REPORT;
505 granted = lqe->lqe_usage;
508 /* acquire as much as needed, but not more */
509 if (usage > granted) {
510 qbody->qb_count = usage - granted;
511 qbody->qb_flags |= QUOTA_DQACQ_FL_ACQ;
514 return qbody->qb_flags != 0;
518 * Acquire quota space from master.
519 * There are at most 1 in-flight dqacq/dqrel.
521 * \param env - the environment passed by the caller
522 * \param lqe - is the qid entry to be processed
524 * \retval 0 - success
525 * \retval -EDQUOT - out of quota
526 * \retval -EINPROGRESS - inform client to retry write/create
527 * \retval -EBUSY - already a quota request in flight
528 * \retval -ve - other appropriate errors
530 static int qsd_acquire_remote(const struct lu_env *env,
531 struct lquota_entry *lqe)
533 struct qsd_thread_info *qti = qsd_info(env);
534 struct quota_body *qbody = &qti->qti_body;
535 struct qsd_instance *qsd;
536 struct qsd_qtype_info *qqi;
540 memset(qbody, 0, sizeof(*qbody));
541 rc = qsd_ready(lqe, &qbody->qb_glb_lockh);
550 /* is quota really enforced for this id? */
551 if (!lqe->lqe_enforced) {
552 lqe_write_unlock(lqe);
553 LQUOTA_DEBUG(lqe, "quota not enforced any more");
557 /* fill qb_count & qb_flags */
558 if (!qsd_calc_acquire(lqe, qbody)) {
559 lqe_write_unlock(lqe);
560 LQUOTA_DEBUG(lqe, "No acquire required");
564 /* check whether an acquire request completed recently */
565 if (lqe->lqe_acq_rc != 0 &&
566 lqe->lqe_acq_time > ktime_get_seconds() - 1) {
567 lqe_write_unlock(lqe);
568 LQUOTA_DEBUG(lqe, "using cached return code %d", lqe->lqe_acq_rc);
569 RETURN(lqe->lqe_acq_rc);
572 /* only 1 quota request in flight for a given ID is allowed */
573 rc = qsd_request_enter(lqe);
575 lqe_write_unlock(lqe);
579 lustre_handle_copy(&qti->qti_lockh, &lqe->lqe_lockh);
580 lqe_write_unlock(lqe);
582 /* hold a refcount until completion */
585 /* fill other quota body fields */
586 qbody->qb_fid = qqi->qqi_fid;
587 qbody->qb_id = lqe->lqe_id;
589 /* check whether we already own a valid lock for this ID */
590 rc = qsd_id_lock_match(&qti->qti_lockh, &qbody->qb_lockh);
592 struct lquota_lvb *lvb;
597 qsd_req_completion(env, qqi, qbody, NULL,
598 &qti->qti_lockh, NULL, lqe, rc);
601 /* no lock found, should use intent */
602 rc = qsd_intent_lock(env, qsd->qsd_exp, qbody, true,
603 IT_QUOTA_DQACQ, qsd_req_completion,
604 qqi, lvb, (void *)lqe);
606 /* lock found, should use regular dqacq */
607 rc = qsd_send_dqacq(env, qsd->qsd_exp, qbody, true,
608 qsd_req_completion, qqi, &qti->qti_lockh,
612 /* the completion function will be called by qsd_send_dqacq or
618 * Acquire \a space of quota space in order to complete an operation.
619 * Try to consume local quota space first and send acquire request to quota
620 * master if required.
622 * \param env - the environment passed by the caller
623 * \param lqe - is the qid entry to be processed
624 * \param space - is the amount of quota required for the operation
625 * \param ret - is the return code (-EDQUOT, -EINPROGRESS, ...)
627 * \retval true - stop waiting in wait_event_idle_timeout,
628 * and real return value in \a ret
629 * \retval false - continue waiting
631 static bool qsd_acquire(const struct lu_env *env, struct lquota_entry *lqe,
632 long long space, int *ret)
635 int wait_pending = 0;
636 struct qsd_qtype_info *qqi = lqe2qqi(lqe);
640 for (count = 0; rc == 0; count++) {
641 LQUOTA_DEBUG(lqe, "acquiring:%lld count=%d", space, count);
643 if (lqe2qqi(lqe)->qqi_qsd->qsd_stopping) {
648 /* refresh disk usage */
649 rc = qsd_refresh_usage(env, lqe);
653 /* try to consume local quota space first */
654 rc = qsd_acquire_local(lqe, space);
656 /* rc == 0, Wouhou! enough local quota space
657 * rc < 0, something bad happened */
660 * There might be a window that commit transaction
661 * have updated usage but pending write doesn't change
662 * wait for it before acquiring remotely.
664 if (lqe->lqe_pending_write >= space && !wait_pending) {
666 dt_sync(env, qqi->qqi_qsd->qsd_dev);
670 /* if we have gotten some quota and stil wait more quota,
671 * it's better to give QMT some time to reclaim from clients */
673 schedule_timeout_interruptible(cfs_time_seconds(1));
675 /* need to acquire more quota space from master */
676 rc = qsd_acquire_remote(env, lqe);
680 /* already a request in flight, continue waiting */
687 * Quota enforcement handler. If local quota can satisfy this operation,
688 * return success, otherwise, acquire more quota from master.
689 * (for write operation, if master isn't available at this moment, return
690 * -EINPROGRESS to inform client to retry the write)
692 * \param env - the environment passed by the caller
693 * \param qsd - is the qsd instance associated with the device in charge
695 * \param qid - is the qid information attached in the transaction handle
696 * \param space - is the space required by the operation
697 * \param flags - if the operation is write, return caller no user/group
698 * and sync commit flags
700 * \retval 0 - success
701 * \retval -EDQUOT - out of quota
702 * \retval -EINPROGRESS - inform client to retry write
703 * \retval -ve - other appropriate errors
705 static int qsd_op_begin0(const struct lu_env *env, struct qsd_qtype_info *qqi,
706 struct lquota_id_info *qid, long long space,
707 enum osd_quota_local_flags *local_flags)
709 struct lquota_entry *lqe;
710 DEFINE_WAIT_FUNC(wait, woken_wake_function);
711 enum osd_quota_local_flags qtype_flag = 0;
712 int rc = 0, ret = -EINPROGRESS;
716 if (qid->lqi_qentry != NULL) {
717 /* we already had to deal with this id for this transaction */
718 lqe = qid->lqi_qentry;
719 if (!lqe->lqe_enforced)
722 /* look up lquota entry associated with qid */
723 lqe = lqe_locate(env, qqi->qqi_site, &qid->lqi_id);
725 RETURN(PTR_ERR(lqe));
726 if (!lqe->lqe_enforced) {
730 qid->lqi_qentry = lqe;
731 /* lqe will be released in qsd_op_end() */
735 /* when space is negative or null, we don't need to consume
736 * quota space. That said, we still want to perform space
737 * adjustments in qsd_op_end, so we return here, but with
738 * a reference on the lqe */
739 if (local_flags != NULL) {
740 rc = qsd_refresh_usage(env, lqe);
746 LQUOTA_DEBUG(lqe, "op_begin space:%lld", space);
749 lqe->lqe_waiting_write += space;
750 lqe_write_unlock(lqe);
752 /* acquire quota space for the operation, cap overall wait time to
753 * prevent a service thread from being stuck for too long
755 remaining = cfs_time_seconds(qsd_wait_timeout(qqi->qqi_qsd));
756 add_wait_queue(&lqe->lqe_waiters, &wait);
758 if (qsd_acquire(env, lqe, space, &ret))
761 remaining = wait_woken(&wait, TASK_IDLE, remaining);
762 } while (remaining > 0);
764 if (remaining > 0 && ret == 0) {
765 qid->lqi_space += space;
770 else if (remaining == 0)
773 LQUOTA_DEBUG(lqe, "acquire quota failed:%d", rc);
776 lqe->lqe_waiting_write -= space;
778 if (local_flags && lqe->lqe_pending_write != 0)
779 /* Inform OSD layer that there are pending writes.
780 * It might want to retry after a sync if appropriate
782 *local_flags |= QUOTA_FL_SYNC;
783 lqe_write_unlock(lqe);
785 /* convert recoverable error into -EINPROGRESS, client will
788 if (rc == -ETIMEDOUT || rc == -ENOTCONN || rc == -ENOLCK ||
789 rc == -EAGAIN || rc == -EINTR) {
791 } else if (rc == -ESRCH) {
794 "ID isn't enforced on master, it probably due to a legeal race, if this message is showing up constantly, there could be some inconsistence between master & slave, and quota reintegration needs be re-triggered.");
797 remove_wait_queue(&lqe->lqe_waiters, &wait);
799 if (local_flags != NULL) {
801 LASSERT(qid->lqi_is_blk);
803 *local_flags |= lquota_over_fl(qqi->qqi_qtype);
808 usage = lqe->lqe_pending_write;
809 usage += lqe->lqe_waiting_write;
810 /* There is a chance to successfully grant more quota
811 * but get edquot flag through glimpse. */
812 if (lqe->lqe_edquot || (lqe->lqe_qunit != 0 &&
813 (usage % lqe->lqe_qunit >
814 qqi->qqi_qsd->qsd_sync_threshold)))
815 usage += qqi->qqi_qsd->qsd_sync_threshold;
817 usage += lqe->lqe_usage;
819 qtype_flag = lquota_over_fl(qqi->qqi_qtype);
820 /* if we should notify client to start sync write */
821 if (usage >= lqe->lqe_granted - lqe->lqe_pending_rel)
822 *local_flags |= qtype_flag;
824 *local_flags &= ~qtype_flag;
825 lqe_read_unlock(lqe);
832 * helper function comparing two lquota_id_info structures
834 static inline bool qid_equal(struct lquota_id_info *q1,
835 struct lquota_id_info *q2)
837 if (q1->lqi_is_blk != q2->lqi_is_blk || q1->lqi_type != q2->lqi_type)
839 return (q1->lqi_id.qid_uid == q2->lqi_id.qid_uid) ? true : false;
843 * Enforce quota, it's called in the declaration of each operation.
844 * qsd_op_end() will then be called later once all the operations have been
845 * completed in order to release/adjust the quota space.
847 * \param env - the environment passed by the caller
848 * \param qsd - is the qsd instance associated with the device in charge of
850 * \param trans - is the quota transaction information
851 * \param qi - qid & space required by current operation
852 * \param flags - if the operation is write, return caller no user/group and
855 * \retval 0 - success
856 * \retval -EDQUOT - out of quota
857 * \retval -EINPROGRESS - inform client to retry write
858 * \retval -ve - other appropriate errors
860 int qsd_op_begin(const struct lu_env *env, struct qsd_instance *qsd,
861 struct lquota_trans *trans, struct lquota_id_info *qi,
862 enum osd_quota_local_flags *local_flags)
868 /* fast path, ignore quota enforcement request for root owned files */
869 if (qi->lqi_id.qid_uid == 0)
872 if (unlikely(qsd == NULL))
875 if (qsd->qsd_dev->dd_rdonly)
878 /* We don't enforce quota until the qsd_instance is started */
879 read_lock(&qsd->qsd_lock);
880 if (!qsd->qsd_started) {
881 read_unlock(&qsd->qsd_lock);
884 read_unlock(&qsd->qsd_lock);
886 /* ignore block quota on MDTs, ignore inode quota on OSTs */
887 if ((!qsd->qsd_is_md && !qi->lqi_is_blk) ||
888 (qsd->qsd_is_md && qi->lqi_is_blk))
891 /* ignore quota enforcement request when:
892 * - quota isn't enforced for this quota type
893 * or - the user/group is root
894 * or - quota accounting isn't enabled */
895 if (!qsd_type_enabled(qsd, qi->lqi_type) ||
896 (qsd->qsd_type_array[qi->lqi_type])->qqi_acct_failed)
899 if (qi->lqi_type == PRJQUOTA && local_flags && qi->lqi_id.qid_projid &&
900 (qsd->qsd_root_prj_enable || !qi->lqi_ignore_root_proj_quota))
901 *local_flags |= QUOTA_FL_ROOT_PRJQUOTA;
903 LASSERTF(trans->lqt_id_cnt <= QUOTA_MAX_TRANSIDS, "id_cnt=%d\n",
905 /* check whether we already allocated a slot for this id */
906 for (i = 0; i < trans->lqt_id_cnt; i++) {
907 if (qid_equal(qi, &trans->lqt_ids[i])) {
914 if (unlikely(i >= QUOTA_MAX_TRANSIDS)) {
915 CERROR("%s: more than %d qids enforced for a "
916 "transaction?\n", qsd->qsd_svname, i);
921 trans->lqt_ids[i].lqi_id = qi->lqi_id;
922 trans->lqt_ids[i].lqi_type = qi->lqi_type;
923 trans->lqt_ids[i].lqi_is_blk = qi->lqi_is_blk;
927 /* manage quota enforcement for this ID */
928 rc = qsd_op_begin0(env, qsd->qsd_type_array[qi->lqi_type],
929 &trans->lqt_ids[i], qi->lqi_space, local_flags);
932 EXPORT_SYMBOL(qsd_op_begin);
935 * Adjust quota space (by acquiring or releasing) hold by the quota slave.
936 * This function is called after each quota request completion and during
937 * reintegration in order to report usage or re-acquire quota locks.
938 * Space adjustment is aborted if there is already a quota request in flight
941 * \param env - the environment passed by the caller
942 * \param lqe - is the qid entry to be processed
944 * \retval 0 on success, appropriate errors on failure
946 int qsd_adjust(const struct lu_env *env, struct lquota_entry *lqe)
948 struct qsd_thread_info *qti = qsd_info(env);
949 struct quota_body *qbody = &qti->qti_body;
950 struct qsd_instance *qsd;
951 struct qsd_qtype_info *qqi;
956 memset(qbody, 0, sizeof(*qbody));
957 rc = qsd_ready(lqe, &qbody->qb_glb_lockh);
959 /* add to adjust list again to trigger adjustment later when
961 LQUOTA_DEBUG(lqe, "delaying adjustment since qsd isn't ready");
962 qsd_adjust_schedule(lqe, true, false);
969 if (qsd->qsd_dev->dd_rdonly)
974 /* fill qb_count & qb_flags */
975 if (!qsd_calc_adjust(lqe, qbody)) {
976 lqe_write_unlock(lqe);
977 LQUOTA_DEBUG(lqe, "no adjustment required");
981 /* only 1 quota request in flight for a given ID is allowed */
982 rc = qsd_request_enter(lqe);
984 /* already a request in flight, space adjustment will be run
985 * again on request completion */
986 lqe_write_unlock(lqe);
990 if (req_is_rel(qbody->qb_flags))
991 lqe->lqe_pending_rel = qbody->qb_count;
992 lustre_handle_copy(&qti->qti_lockh, &lqe->lqe_lockh);
993 lqe_write_unlock(lqe);
995 /* hold a refcount until completion */
998 /* fill other quota body fields */
999 qbody->qb_fid = qqi->qqi_fid;
1000 qbody->qb_id = lqe->lqe_id;
1002 if (req_is_acq(qbody->qb_flags) || req_is_preacq(qbody->qb_flags)) {
1003 /* check whether we own a valid lock for this ID */
1004 rc = qsd_id_lock_match(&qti->qti_lockh, &qbody->qb_lockh);
1006 memset(&qti->qti_lockh, 0, sizeof(qti->qti_lockh));
1007 if (req_is_preacq(qbody->qb_flags)) {
1008 if (req_has_rep(qbody->qb_flags))
1009 /* still want to report usage */
1010 qbody->qb_flags = QUOTA_DQACQ_FL_REPORT;
1012 /* no pre-acquire if no per-ID lock */
1013 GOTO(out, rc = -ENOLCK);
1015 /* no lock found, should use intent */
1018 } else if (req_is_acq(qbody->qb_flags) &&
1019 qbody->qb_count == 0) {
1020 /* found cached lock, no need to acquire */
1024 /* release and report don't need a per-ID lock */
1025 memset(&qti->qti_lockh, 0, sizeof(qti->qti_lockh));
1029 rc = qsd_send_dqacq(env, qsd->qsd_exp, qbody, false,
1030 qsd_req_completion, qqi, &qti->qti_lockh,
1033 struct lquota_lvb *lvb;
1037 GOTO(out, rc = -ENOMEM);
1039 rc = qsd_intent_lock(env, qsd->qsd_exp, qbody, false,
1040 IT_QUOTA_DQACQ, qsd_req_completion,
1041 qqi, lvb, (void *)lqe);
1043 /* the completion function will be called by qsd_send_dqacq or
1044 * qsd_intent_lock */
1047 qsd_req_completion(env, qqi, qbody, NULL, &qti->qti_lockh, NULL, lqe,
1053 * Post quota operation, pre-acquire/release quota from master.
1055 * \param env - the environment passed by the caller
1056 * \param qsd - is the qsd instance attached to the OSD device which
1057 * is handling the operation.
1058 * \param qqi - is the qsd_qtype_info structure associated with the quota ID
1059 * subject to the operation
1060 * \param qid - stores information related to his ID for the operation
1061 * which has just completed
1063 * \retval 0 - success
1064 * \retval -ve - failure
1066 static void qsd_op_end0(const struct lu_env *env, struct qsd_qtype_info *qqi,
1067 struct lquota_id_info *qid)
1069 struct lquota_entry *lqe;
1073 lqe = qid->lqi_qentry;
1076 qid->lqi_qentry = NULL;
1078 /* refresh cached usage if a suitable environment is passed */
1080 qsd_refresh_usage(env, lqe);
1082 lqe_write_lock(lqe);
1083 if (qid->lqi_space > 0) {
1084 if (lqe->lqe_pending_write < qid->lqi_space) {
1086 "More pending write quota to reduce (pending %llu, space %lld)\n",
1087 lqe->lqe_pending_write, qid->lqi_space);
1088 lqe->lqe_pending_write = 0;
1090 lqe->lqe_pending_write -= qid->lqi_space;
1094 adjust = qsd_adjust_needed(lqe);
1097 lqe_write_unlock(lqe);
1100 /* pre-acquire/release quota space is needed */
1102 qsd_adjust(env, lqe);
1104 /* no suitable environment, handle adjustment in
1105 * separate thread context */
1106 qsd_adjust_schedule(lqe, false, false);
1113 * Post quota operation. It's called after each operation transaction stopped.
1115 * \param env - the environment passed by the caller
1116 * \param qsd - is the qsd instance associated with device which is handling
1118 * \param qids - all qids information attached in the transaction handle
1119 * \param count - is the number of qid entries in the qids array.
1121 * \retval 0 - success
1122 * \retval -ve - failure
1124 void qsd_op_end(const struct lu_env *env, struct qsd_instance *qsd,
1125 struct lquota_trans *trans)
1130 if (unlikely(qsd == NULL))
1133 if (qsd->qsd_dev->dd_rdonly)
1136 /* We don't enforce quota until the qsd_instance is started */
1137 read_lock(&qsd->qsd_lock);
1138 if (!qsd->qsd_started) {
1139 read_unlock(&qsd->qsd_lock);
1142 read_unlock(&qsd->qsd_lock);
1144 LASSERT(trans != NULL);
1146 for (i = 0; i < trans->lqt_id_cnt; i++) {
1147 struct qsd_qtype_info *qqi;
1149 if (trans->lqt_ids[i].lqi_qentry == NULL)
1152 qqi = qsd->qsd_type_array[trans->lqt_ids[i].lqi_type];
1153 qsd_op_end0(env, qqi, &trans->lqt_ids[i]);
1156 /* reset id_count to 0 so that a second accidental call to qsd_op_end()
1157 * does not result in failure */
1158 trans->lqt_id_cnt = 0;
1161 EXPORT_SYMBOL(qsd_op_end);
1163 /* Simple wrapper on top of qsd API which implement quota transfer for osd
1164 * setattr needs. As a reminder, only the root user can change ownership of
1165 * a file, that's why EDQUOT & EINPROGRESS errors are discarded
1167 int qsd_transfer(const struct lu_env *env, struct qsd_instance *qsd,
1168 struct lquota_trans *trans, unsigned int qtype,
1169 u64 orig_id, u64 new_id, u64 bspace,
1170 struct lquota_id_info *qi)
1177 LASSERT(qtype < LL_MAXQUOTAS);
1178 if (qtype == PRJQUOTA)
1179 if (!projid_valid(make_kprojid(&init_user_ns, new_id)))
1182 qi->lqi_type = qtype;
1184 /* inode accounting */
1185 qi->lqi_is_blk = false;
1187 /* one more inode for the new owner ... */
1188 qi->lqi_id.qid_uid = new_id;
1190 rc = qsd_op_begin(env, qsd, trans, qi, NULL);
1191 if (rc == -EDQUOT || rc == -EINPROGRESS)
1196 /* and one less inode for the current id */
1197 qi->lqi_id.qid_uid = orig_id;
1199 /* can't get EDQUOT when reducing usage */
1200 rc = qsd_op_begin(env, qsd, trans, qi, NULL);
1201 if (rc == -EINPROGRESS)
1206 /* block accounting */
1207 qi->lqi_is_blk = true;
1209 /* more blocks for the new owner ... */
1210 qi->lqi_id.qid_uid = new_id;
1211 qi->lqi_space = bspace;
1212 rc = qsd_op_begin(env, qsd, trans, qi, NULL);
1213 if (rc == -EDQUOT || rc == -EINPROGRESS)
1218 /* and finally less blocks for the current owner */
1219 qi->lqi_id.qid_uid = orig_id;
1220 qi->lqi_space = -bspace;
1221 rc = qsd_op_begin(env, qsd, trans, qi, NULL);
1222 /* can't get EDQUOT when reducing usage */
1223 if (rc == -EINPROGRESS)
1227 EXPORT_SYMBOL(qsd_transfer);
1230 * Trigger pre-acquire/release if necessary.
1231 * It's only used by ldiskfs osd so far. When unlink a file in ldiskfs, the
1232 * quota accounting isn't updated when the transaction stopped. Instead, it'll
1233 * be updated on the final iput, so qsd_op_adjust() will be called then (in
1234 * osd_object_delete()) to trigger quota release if necessary.
1236 * \param env - the environment passed by the caller
1237 * \param qsd - is the qsd instance associated with the device in charge
1239 * \param qid - is the lquota ID of the user/group for which to trigger
1240 * quota space adjustment
1241 * \param qtype - is the quota type (USRQUOTA or GRPQUOTA)
1243 void qsd_op_adjust(const struct lu_env *env, struct qsd_instance *qsd,
1244 union lquota_id *qid, int qtype)
1246 struct lquota_entry *lqe;
1247 struct qsd_qtype_info *qqi;
1251 if (unlikely(qsd == NULL))
1254 /* We don't enforce quota until the qsd_instance is started */
1255 read_lock(&qsd->qsd_lock);
1256 if (!qsd->qsd_started) {
1257 read_unlock(&qsd->qsd_lock);
1260 read_unlock(&qsd->qsd_lock);
1262 qqi = qsd->qsd_type_array[qtype];
1265 if (!qsd_type_enabled(qsd, qtype) || qqi->qqi_acct_obj == NULL ||
1266 qid->qid_uid == 0 || qsd->qsd_dev->dd_rdonly)
1269 read_lock(&qsd->qsd_lock);
1270 if (!qsd->qsd_started) {
1271 read_unlock(&qsd->qsd_lock);
1274 read_unlock(&qsd->qsd_lock);
1276 lqe = lqe_locate(env, qqi->qqi_site, qid);
1278 CERROR("%s: fail to locate lqe for id:%llu, type:%d\n",
1279 qsd->qsd_svname, qid->qid_uid, qtype);
1283 qsd_refresh_usage(env, lqe);
1286 adjust = qsd_adjust_needed(lqe);
1287 lqe_read_unlock(lqe);
1290 qsd_adjust(env, lqe);
1295 EXPORT_SYMBOL(qsd_op_adjust);
1298 * Reserve or free quota.
1300 * Currently, It's used to reserve quota space before changing the file's group
1301 * for normal user and free the reserved quota after the group change.
1303 * \param env - the environment passed by the caller
1304 * \param qsd - is the qsd instance associated with the device in charge of
1306 * \param qi - qid & space required by current operation
1308 * \retval 0 - success
1309 * \retval -EDQUOT - out of quota
1310 * \retval -EINPROGRESS - inform client to retry write
1311 * \retval -ve - other appropriate errors
1313 int qsd_reserve_or_free_quota(const struct lu_env *env,
1314 struct qsd_instance *qsd,
1315 struct lquota_id_info *qi)
1317 struct qsd_qtype_info *qqi;
1318 bool is_free = qi->lqi_space < 0;
1323 if (unlikely(qsd == NULL))
1326 if (qsd->qsd_dev->dd_rdonly)
1330 qi->lqi_space *= -1;
1332 /* We don't enforce quota until the qsd_instance is started */
1333 read_lock(&qsd->qsd_lock);
1334 if (!qsd->qsd_started) {
1335 read_unlock(&qsd->qsd_lock);
1338 read_unlock(&qsd->qsd_lock);
1340 qqi = qsd->qsd_type_array[qi->lqi_type];
1343 CDEBUG(D_QUOTA, "type %s, acct %s, free %d, count %llu\n",
1344 qsd_type_enabled(qsd, qi->lqi_type) ? "enabled" : "disabled",
1345 (qsd->qsd_type_array[qi->lqi_type])->qqi_acct_failed ? "failed" :
1346 "succeed", is_free, qi->lqi_space);
1348 /* ignore quota enforcement request when:
1349 * - quota isn't enforced for this quota type
1350 * or - the user/group is root
1351 * or - quota accounting isn't enabled
1354 (!qsd_type_enabled(qsd, qi->lqi_type) || qi->lqi_id.qid_uid == 0 ||
1355 (qsd->qsd_type_array[qi->lqi_type])->qqi_acct_failed))
1359 qsd_op_end0(env, qsd->qsd_type_array[qi->lqi_type], qi);
1361 long long qspace = qi->lqi_space;
1363 /* the acquired quota will add to lqi_space in qsd_op_begin0 */
1365 rc = qsd_op_begin0(env, qsd->qsd_type_array[qi->lqi_type], qi,
1367 if (rc && qi->lqi_qentry) {
1368 lqe_putref(qi->lqi_qentry);
1369 qi->lqi_qentry = NULL;
1373 CDEBUG(D_QUOTA, "%s quota: type %i, uid %llu, gid %llu, space %llu\n",
1374 is_free ? "Free" : "Reserve", qi->lqi_type, qi->lqi_id.qid_uid,
1375 qi->lqi_id.qid_gid, qi->lqi_space);
1379 EXPORT_SYMBOL(qsd_reserve_or_free_quota);