4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; if not, write to the
18 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19 * Boston, MA 021110-1307, USA
24 * Copyright (c) 2012, 2017, Intel Corporation.
25 * Use is subject to license terms.
27 * Author: Johann Lombardi <johann.lombardi@intel.com>
28 * Author: Niu Yawei <yawei.niu@intel.com>
31 #define DEBUG_SUBSYSTEM S_LQUOTA
33 #include "qsd_internal.h"
36 * helper function bumping lqe_pending_req if there is no quota request in
37 * flight for the lquota entry \a lqe. Otherwise, EBUSY is returned.
39 static inline int qsd_request_enter(struct lquota_entry *lqe)
41 /* is there already a quota request in flight? */
42 if (lqe->lqe_pending_req != 0) {
43 LQUOTA_DEBUG(lqe, "already a request in flight");
47 if (lqe->lqe_pending_rel != 0) {
48 LQUOTA_ERROR(lqe, "no request in flight with pending_rel=%llu",
49 lqe->lqe_pending_rel);
53 lqe->lqe_pending_req++;
58 * Companion of qsd_request_enter() dropping lqe_pending_req to 0.
60 static inline void qsd_request_exit(struct lquota_entry *lqe)
62 if (lqe->lqe_pending_req != 1) {
63 LQUOTA_ERROR(lqe, "lqe_pending_req != 1!!!");
66 lqe->lqe_pending_req--;
67 lqe->lqe_pending_rel = 0;
68 wake_up(&lqe->lqe_waiters);
72 * Check whether a qsd instance is all set to send quota request to master.
73 * This includes checking whether:
74 * - the connection to master is set up and usable,
75 * - the qsd isn't stopping
76 * - reintegration has been successfully completed and all indexes are
79 * \param lqe - is the lquota entry for which we would like to send an quota
81 * \param lockh - is the remote handle of the global lock returned on success
83 * \retval 0 on success, appropriate error on failure
85 static int qsd_ready(struct lquota_entry *lqe, struct lustre_handle *lockh)
87 struct qsd_qtype_info *qqi = lqe2qqi(lqe);
88 struct qsd_instance *qsd = qqi->qqi_qsd;
89 struct obd_import *imp = NULL;
90 struct ldlm_lock *lock;
93 read_lock(&qsd->qsd_lock);
94 /* is the qsd about to shut down? */
95 if (qsd->qsd_stopping) {
96 read_unlock(&qsd->qsd_lock);
97 LQUOTA_DEBUG(lqe, "dropping quota req since qsd is stopping");
98 /* Target is about to shut down, client will retry */
102 /* is the connection to the quota master ready? */
103 if (qsd->qsd_exp_valid)
104 imp = class_exp2cliimp(qsd->qsd_exp);
105 if (imp == NULL || imp->imp_invalid) {
106 read_unlock(&qsd->qsd_lock);
107 LQUOTA_DEBUG(lqe, "connection to master not ready");
111 /* In most case, reintegration must have been triggered (when enable
112 * quota or on OST start), however, in rare race condition (enabling
113 * quota when starting OSTs), we might miss triggering reintegration
116 * If the previous reintegration failed for some reason, we'll
117 * re-trigger it here as well. */
118 if (!qqi->qqi_glb_uptodate || !qqi->qqi_slv_uptodate) {
119 read_unlock(&qsd->qsd_lock);
120 LQUOTA_DEBUG(lqe, "not up-to-date, dropping request and "
121 "kicking off reintegration");
122 qsd_start_reint_thread(qqi);
123 RETURN(-EINPROGRESS);
126 /* Fill the remote global lock handle, master will check this handle
127 * to see if the slave is sending request with stale lock */
128 lustre_handle_copy(lockh, &qqi->qqi_lockh);
129 read_unlock(&qsd->qsd_lock);
131 if (!lustre_handle_is_used(lockh))
134 lock = ldlm_handle2lock(lockh);
138 /* return remote lock handle to be packed in quota request */
139 lustre_handle_copy(lockh, &lock->l_remote_handle);
146 * Check whether any quota space adjustment (pre-acquire/release/report) is
147 * needed for a given quota ID. If a non-null \a qbody is passed, then the
148 * \a qbody structure (qb_count/flags/usage) is filled with appropriate data
149 * to be packed in the quota request.
151 * \param lqe - is the lquota entry for which we would like to adjust quota
153 * \param qbody - is the quota body to fill, if not NULL.
155 * \retval true - space adjustment is required and \a qbody is filled, if not
157 * \retval false - no space adjustment required
159 static bool qsd_calc_adjust(struct lquota_entry *lqe, struct quota_body *qbody)
161 __u64 usage, granted;
164 usage = lqe->lqe_usage;
165 usage += lqe->lqe_pending_write + lqe->lqe_waiting_write;
166 granted = lqe->lqe_granted;
171 if (!lqe->lqe_enforced) {
172 /* quota not enforced any more for this ID */
174 /* release all quota space unconditionally */
175 LQUOTA_DEBUG(lqe, "not enforced, releasing all space");
177 qbody->qb_count = granted;
178 qbody->qb_flags = QUOTA_DQACQ_FL_REL;
185 if (!lustre_handle_is_used(&lqe->lqe_lockh)) {
186 /* No valid per-ID lock
187 * When reporting quota (during reintegration or on setquota
188 * glimpse), we should release granted space if usage is 0.
189 * Otherwise, if the usage is less than granted, we need to
190 * acquire the per-ID lock to make sure the unused grant can be
191 * reclaimed by per-ID lock glimpse. */
193 /* no on-disk usage and no outstanding activity, release
196 LQUOTA_DEBUG(lqe, "no usage, releasing all "
199 qbody->qb_count = granted;
200 qbody->qb_flags = QUOTA_DQACQ_FL_REL;
204 LQUOTA_DEBUG(lqe, "no usage + no granted, nothing to "
209 if (lqe->lqe_usage < lqe->lqe_granted) {
210 /* holding quota space w/o any lock, enqueue per-ID lock
212 LQUOTA_DEBUG(lqe, "(re)acquiring per-ID lock");
215 qbody->qb_flags = QUOTA_DQACQ_FL_ACQ;
220 if (lqe->lqe_usage > lqe->lqe_granted) {
221 /* quota overrun, report usage */
222 LQUOTA_DEBUG(lqe, "overrun, reporting usage");
224 qbody->qb_usage = lqe->lqe_usage;
225 qbody->qb_flags = QUOTA_DQACQ_FL_REPORT;
229 LQUOTA_DEBUG(lqe, "granted matches usage, nothing to do");
234 * Apply good old quota qunit adjustment logic which has been around
236 * 1. revoke all extra grant
238 if (lqe->lqe_revoke) {
244 LQUOTA_DEBUG(lqe, "revoke pre-acquired quota: %llu - %llu\n",
246 qbody->qb_count = granted - usage;
247 qbody->qb_flags = QUOTA_DQACQ_FL_REL;
251 /* 2. release spare quota space? */
252 if (granted > usage + lqe->lqe_qunit) {
253 /* pre-release quota space */
256 qbody->qb_count = granted - usage;
257 /* if usage == 0, release all granted space */
259 /* try to keep one qunit of quota space */
260 qbody->qb_count -= lqe->lqe_qunit;
261 /* but don't release less than qtune to avoid releasing
263 if (qbody->qb_count < lqe->lqe_qtune)
264 qbody->qb_count = lqe->lqe_qtune;
266 qbody->qb_flags = QUOTA_DQACQ_FL_REL;
270 /* 3. Any quota overrun? */
271 if (lqe->lqe_usage > lqe->lqe_granted) {
272 /* we overconsumed quota space, we report usage in request so
273 * that master can adjust it unconditionally */
276 qbody->qb_usage = lqe->lqe_usage;
277 granted = lqe->lqe_usage;
278 qbody->qb_flags = QUOTA_DQACQ_FL_REPORT;
281 /* 4. Time to pre-acquire? */
282 if (!lqe->lqe_edquot && !lqe->lqe_nopreacq && usage > 0 &&
283 lqe->lqe_qunit != 0 && granted < usage + lqe->lqe_qtune) {
284 /* To pre-acquire quota space, we report how much spare quota
285 * space the slave currently owns, then the master will grant us
286 * back how much we can pretend given the current state of
290 if (granted <= usage)
293 qbody->qb_count = granted - usage;
294 qbody->qb_flags |= QUOTA_DQACQ_FL_PREACQ;
299 RETURN(qbody->qb_flags != 0);
305 * Helper function returning true when quota space need to be adjusted (some
306 * unused space should be free or pre-acquire) and false otherwise.
308 static inline bool qsd_adjust_needed(struct lquota_entry *lqe)
310 return qsd_calc_adjust(lqe, NULL);
314 * Callback function called when an acquire/release request sent to the master
317 static void qsd_req_completion(const struct lu_env *env,
318 struct qsd_qtype_info *qqi,
319 struct quota_body *reqbody,
320 struct quota_body *repbody,
321 struct lustre_handle *lockh,
322 struct lquota_lvb *lvb,
325 struct lquota_entry *lqe = (struct lquota_entry *)arg;
326 struct qsd_thread_info *qti;
328 bool adjust = false, cancel = false;
331 LASSERT(qqi != NULL && lqe != NULL);
333 /* environment passed by ptlrpcd is mostly used by CLIO and hasn't the
335 rc = lu_env_refill_by_tags((struct lu_env *)env, LCT_DT_THREAD, 0);
337 LQUOTA_ERROR(lqe, "failed to refill environmnent %d", rc);
339 /* can't afford to adjust quota space with no suitable lu_env */
340 GOTO(out_noadjust, rc);
345 LQUOTA_DEBUG(lqe, "DQACQ returned %d, flags:0x%x", ret,
348 /* despite -EDQUOT & -EINPROGRESS errors, the master might still
349 * grant us back quota space to adjust quota overrun */
350 if (ret != 0 && ret != -EDQUOT && ret != -EINPROGRESS) {
351 if (ret != -ETIMEDOUT && ret != -ENOTCONN &&
352 ret != -ESHUTDOWN && ret != -EAGAIN)
353 /* print errors only if return code is unexpected */
354 LQUOTA_ERROR(lqe, "DQACQ failed with %d, flags:0x%x",
355 ret, reqbody->qb_flags);
359 /* Set the lqe_lockh */
360 if (lustre_handle_is_used(lockh) &&
361 !lustre_handle_equal(lockh, &lqe->lqe_lockh))
362 lustre_handle_copy(&lqe->lqe_lockh, lockh);
364 /* If the replied qb_count is zero, it means master didn't process
365 * the DQACQ since the limit for this ID has been removed, so we
366 * should not update quota entry & slave index copy neither. */
367 if (repbody != NULL && repbody->qb_count != 0) {
368 LQUOTA_DEBUG(lqe, "DQACQ qb_count:%llu", repbody->qb_count);
370 if (lqe->lqe_is_reset) {
371 lqe->lqe_granted = 0;
372 } else if (req_is_rel(reqbody->qb_flags)) {
373 if (lqe->lqe_granted < repbody->qb_count) {
374 LQUOTA_ERROR(lqe, "can't release more space "
375 "than owned %llu<%llu",
378 lqe->lqe_granted = 0;
380 lqe->lqe_granted -= repbody->qb_count;
382 /* Cancel the per-ID lock initiatively when there
383 * isn't any usage & grant, which can avoid master
384 * sending glimpse unnecessarily to this slave on
386 if (!lqe->lqe_pending_write && !lqe->lqe_granted &&
387 !lqe->lqe_waiting_write && !lqe->lqe_usage)
390 lqe->lqe_granted += repbody->qb_count;
392 qti->qti_rec.lqr_slv_rec.qsr_granted = lqe->lqe_granted;
393 lqe_write_unlock(lqe);
395 /* Update the slave index file in the dedicated thread. So far,
396 * We don't update the version of slave index copy on DQACQ.
397 * No locking is necessary since nobody can change
398 * lqe->lqe_granted while lqe->lqe_pending_req > 0 */
399 if (CFS_FAIL_CHECK(OBD_FAIL_QUOTA_GRANT))
400 qti->qti_rec.lqr_slv_rec.qsr_granted =
401 0xFFFFFFFFFFDEC80CULL;
402 qsd_upd_schedule(qqi, lqe, &lqe->lqe_id, &qti->qti_rec, 0,
407 /* extract information from lvb */
408 if (ret == 0 && lvb != NULL) {
409 if (lvb->lvb_id_qunit != 0)
410 qsd_set_qunit(lqe, lvb->lvb_id_qunit);
411 qsd_set_edquot(lqe, !!(lvb->lvb_flags & LQUOTA_FL_EDQUOT));
412 } else if (repbody != NULL && repbody->qb_qunit != 0) {
413 qsd_set_qunit(lqe, repbody->qb_qunit);
416 /* turn off pre-acquire if it failed with -EDQUOT. This is done to avoid
417 * flooding the master with acquire request. Pre-acquire will be turned
418 * on again as soon as qunit is modified */
419 if (req_is_preacq(reqbody->qb_flags) && ret == -EDQUOT)
420 lqe->lqe_nopreacq = true;
422 adjust = qsd_adjust_needed(lqe);
423 if (reqbody && req_is_acq(reqbody->qb_flags) && ret != -EDQUOT) {
424 lqe->lqe_acq_rc = ret;
425 lqe->lqe_acq_time = ktime_get_seconds();
428 qsd_request_exit(lqe);
429 lqe_write_unlock(lqe);
431 /* release reference on per-ID lock */
432 if (lustre_handle_is_used(lockh))
433 ldlm_lock_decref(lockh, qsd_id_einfo.ei_mode);
436 qsd_adjust_schedule(lqe, false, true);
438 if (!ret || ret == -EDQUOT)
439 qsd_adjust_schedule(lqe, false, false);
441 qsd_adjust_schedule(lqe, true, false);
451 * Try to consume local quota space.
453 * \param lqe - is the qid entry to be processed
454 * \param space - is the amount of quota space needed to complete the operation
456 * \retval 0 - success
457 * \retval -EDQUOT - out of quota
458 * \retval -EAGAIN - need to acquire space from master
460 static int qsd_acquire_local(struct lquota_entry *lqe, __u64 space)
466 if (!lqe->lqe_enforced)
467 /* not enforced any more, we are good */
471 /* use latest usage */
472 usage = lqe->lqe_usage;
473 /* take pending write into account */
474 usage += lqe->lqe_pending_write;
476 if (space + usage <= lqe->lqe_granted - lqe->lqe_pending_rel) {
477 /* Yay! we got enough space */
478 lqe->lqe_pending_write += space;
479 lqe->lqe_waiting_write -= space;
481 /* lqe_edquot flag is used to avoid flooding dqacq requests when
482 * the user is over quota, however, the lqe_edquot could be stale
483 * sometimes due to the race reply of dqacq vs. id lock glimpse
484 * (see LU-4505), so we revalidate it every 5 seconds. */
485 } else if (lqe->lqe_edquot &&
486 (lqe->lqe_edquot_time > ktime_get_seconds() - 5)) {
491 lqe_write_unlock(lqe);
497 * Compute how much quota space should be acquire from the master based
498 * on how much is currently granted to this slave and pending/waiting
501 * \param lqe - is the lquota entry for which we would like to adjust quota
503 * \param qbody - is the quota body of the acquire request to fill
505 * \retval true - space acquisition is needed and qbody is filled
506 * \retval false - no space acquisition required
508 static inline bool qsd_calc_acquire(struct lquota_entry *lqe,
509 struct quota_body *qbody)
511 __u64 usage, granted;
513 usage = lqe->lqe_usage;
514 usage += lqe->lqe_pending_write + lqe->lqe_waiting_write;
515 granted = lqe->lqe_granted;
519 /* if we overconsumed quota space, we report usage in request so that
520 * master can adjust it unconditionally */
521 if (lqe->lqe_usage > lqe->lqe_granted) {
522 qbody->qb_usage = lqe->lqe_usage;
523 qbody->qb_flags = QUOTA_DQACQ_FL_REPORT;
524 granted = lqe->lqe_usage;
527 /* acquire as much as needed, but not more */
528 if (usage > granted) {
529 qbody->qb_count = usage - granted;
530 qbody->qb_flags |= QUOTA_DQACQ_FL_ACQ;
533 return qbody->qb_flags != 0;
537 * Acquire quota space from master.
538 * There are at most 1 in-flight dqacq/dqrel.
540 * \param env - the environment passed by the caller
541 * \param lqe - is the qid entry to be processed
543 * \retval 0 - success
544 * \retval -EDQUOT - out of quota
545 * \retval -EINPROGRESS - inform client to retry write/create
546 * \retval -EBUSY - already a quota request in flight
547 * \retval -ve - other appropriate errors
549 static int qsd_acquire_remote(const struct lu_env *env,
550 struct lquota_entry *lqe)
552 struct qsd_thread_info *qti = qsd_info(env);
553 struct quota_body *qbody = &qti->qti_body;
554 struct qsd_instance *qsd;
555 struct qsd_qtype_info *qqi;
559 memset(qbody, 0, sizeof(*qbody));
560 rc = qsd_ready(lqe, &qbody->qb_glb_lockh);
569 /* is quota really enforced for this id? */
570 if (!lqe->lqe_enforced) {
571 lqe_write_unlock(lqe);
572 LQUOTA_DEBUG(lqe, "quota not enforced any more");
576 /* fill qb_count & qb_flags */
577 if (!qsd_calc_acquire(lqe, qbody)) {
578 lqe_write_unlock(lqe);
579 LQUOTA_DEBUG(lqe, "No acquire required");
583 /* check whether an acquire request completed recently */
584 if (lqe->lqe_acq_rc != 0 &&
585 lqe->lqe_acq_time > ktime_get_seconds() - 1) {
586 lqe_write_unlock(lqe);
587 LQUOTA_DEBUG(lqe, "using cached return code %d", lqe->lqe_acq_rc);
588 RETURN(lqe->lqe_acq_rc);
591 /* only 1 quota request in flight for a given ID is allowed */
592 rc = qsd_request_enter(lqe);
594 lqe_write_unlock(lqe);
598 lustre_handle_copy(&qti->qti_lockh, &lqe->lqe_lockh);
599 lqe_write_unlock(lqe);
601 /* hold a refcount until completion */
604 /* fill other quota body fields */
605 qbody->qb_fid = qqi->qqi_fid;
606 qbody->qb_id = lqe->lqe_id;
608 /* check whether we already own a valid lock for this ID */
609 rc = qsd_id_lock_match(&qti->qti_lockh, &qbody->qb_lockh);
611 struct lquota_lvb *lvb;
616 qsd_req_completion(env, qqi, qbody, NULL,
617 &qti->qti_lockh, NULL, lqe, rc);
620 /* no lock found, should use intent */
621 rc = qsd_intent_lock(env, qsd->qsd_exp, qbody, true,
622 IT_QUOTA_DQACQ, qsd_req_completion,
623 qqi, lvb, (void *)lqe);
625 /* lock found, should use regular dqacq */
626 rc = qsd_send_dqacq(env, qsd->qsd_exp, qbody, true,
627 qsd_req_completion, qqi, &qti->qti_lockh,
631 /* the completion function will be called by qsd_send_dqacq or
637 * Acquire \a space of quota space in order to complete an operation.
638 * Try to consume local quota space first and send acquire request to quota
639 * master if required.
641 * \param env - the environment passed by the caller
642 * \param lqe - is the qid entry to be processed
643 * \param space - is the amount of quota required for the operation
644 * \param ret - is the return code (-EDQUOT, -EINPROGRESS, ...)
646 * \retval true - stop waiting in wait_event_idle_timeout,
647 * and real return value in \a ret
648 * \retval false - continue waiting
650 static bool qsd_acquire(const struct lu_env *env, struct lquota_entry *lqe,
651 long long space, int *ret)
654 int wait_pending = 0;
655 struct qsd_qtype_info *qqi = lqe2qqi(lqe);
659 for (count = 0; rc == 0; count++) {
660 LQUOTA_DEBUG(lqe, "acquiring:%lld count=%d", space, count);
662 if (lqe2qqi(lqe)->qqi_qsd->qsd_stopping) {
667 /* refresh disk usage */
668 rc = qsd_refresh_usage(env, lqe);
672 /* try to consume local quota space first */
673 rc = qsd_acquire_local(lqe, space);
675 /* rc == 0, Wouhou! enough local quota space
676 * rc < 0, something bad happened */
679 * There might be a window that commit transaction
680 * have updated usage but pending write doesn't change
681 * wait for it before acquiring remotely.
683 if (lqe->lqe_pending_write >= space && !wait_pending) {
685 dt_sync(env, qqi->qqi_qsd->qsd_dev);
689 /* if we have gotten some quota and stil wait more quota,
690 * it's better to give QMT some time to reclaim from clients */
692 schedule_timeout_interruptible(cfs_time_seconds(1));
694 /* need to acquire more quota space from master */
695 rc = qsd_acquire_remote(env, lqe);
699 /* already a request in flight, continue waiting */
706 * Quota enforcement handler. If local quota can satisfy this operation,
707 * return success, otherwise, acquire more quota from master.
708 * (for write operation, if master isn't available at this moment, return
709 * -EINPROGRESS to inform client to retry the write)
711 * \param env - the environment passed by the caller
712 * \param qsd - is the qsd instance associated with the device in charge
714 * \param qid - is the qid information attached in the transaction handle
715 * \param space - is the space required by the operation
716 * \param flags - if the operation is write, return caller no user/group
717 * and sync commit flags
719 * \retval 0 - success
720 * \retval -EDQUOT - out of quota
721 * \retval -EINPROGRESS - inform client to retry write
722 * \retval -ve - other appropriate errors
724 static int qsd_op_begin0(const struct lu_env *env, struct qsd_qtype_info *qqi,
725 struct lquota_id_info *qid, long long space,
726 enum osd_quota_local_flags *local_flags)
728 struct lquota_entry *lqe;
729 enum osd_quota_local_flags qtype_flag = 0;
730 int rc, ret = -EINPROGRESS;
733 if (qid->lqi_qentry != NULL) {
734 /* we already had to deal with this id for this transaction */
735 lqe = qid->lqi_qentry;
736 if (!lqe->lqe_enforced)
739 /* look up lquota entry associated with qid */
740 lqe = lqe_locate(env, qqi->qqi_site, &qid->lqi_id);
742 RETURN(PTR_ERR(lqe));
743 if (!lqe->lqe_enforced) {
747 qid->lqi_qentry = lqe;
748 /* lqe will be released in qsd_op_end() */
752 /* when space is negative or null, we don't need to consume
753 * quota space. That said, we still want to perform space
754 * adjustments in qsd_op_end, so we return here, but with
755 * a reference on the lqe */
756 if (local_flags != NULL) {
757 rc = qsd_refresh_usage(env, lqe);
763 LQUOTA_DEBUG(lqe, "op_begin space:%lld", space);
766 lqe->lqe_waiting_write += space;
767 lqe_write_unlock(lqe);
769 /* acquire quota space for the operation, cap overall wait time to
770 * prevent a service thread from being stuck for too long */
771 rc = wait_event_idle_timeout(
772 lqe->lqe_waiters, qsd_acquire(env, lqe, space, &ret),
773 cfs_time_seconds(qsd_wait_timeout(qqi->qqi_qsd)));
775 if (rc > 0 && ret == 0) {
776 qid->lqi_space += space;
784 LQUOTA_DEBUG(lqe, "acquire quota failed:%d", rc);
787 lqe->lqe_waiting_write -= space;
789 if (local_flags && lqe->lqe_pending_write != 0)
790 /* Inform OSD layer that there are pending writes.
791 * It might want to retry after a sync if appropriate */
792 *local_flags |= QUOTA_FL_SYNC;
793 lqe_write_unlock(lqe);
795 /* convert recoverable error into -EINPROGRESS, client will
797 if (rc == -ETIMEDOUT || rc == -ENOTCONN || rc == -ENOLCK ||
798 rc == -EAGAIN || rc == -EINTR) {
800 } else if (rc == -ESRCH) {
802 LQUOTA_ERROR(lqe, "ID isn't enforced on master, it "
803 "probably due to a legeal race, if this "
804 "message is showing up constantly, there "
805 "could be some inconsistence between "
806 "master & slave, and quota reintegration "
807 "needs be re-triggered.");
811 if (local_flags != NULL) {
813 LASSERT(qid->lqi_is_blk);
815 *local_flags |= lquota_over_fl(qqi->qqi_qtype);
820 usage = lqe->lqe_pending_write;
821 usage += lqe->lqe_waiting_write;
822 /* There is a chance to successfully grant more quota
823 * but get edquot flag through glimpse. */
824 if (lqe->lqe_edquot || (lqe->lqe_qunit != 0 &&
825 (usage % lqe->lqe_qunit >
826 qqi->qqi_qsd->qsd_sync_threshold)))
827 usage += qqi->qqi_qsd->qsd_sync_threshold;
829 usage += lqe->lqe_usage;
831 qtype_flag = lquota_over_fl(qqi->qqi_qtype);
832 /* if we should notify client to start sync write */
833 if (usage >= lqe->lqe_granted - lqe->lqe_pending_rel)
834 *local_flags |= qtype_flag;
836 *local_flags &= ~qtype_flag;
837 lqe_read_unlock(lqe);
844 * helper function comparing two lquota_id_info structures
846 static inline bool qid_equal(struct lquota_id_info *q1,
847 struct lquota_id_info *q2)
849 if (q1->lqi_is_blk != q2->lqi_is_blk || q1->lqi_type != q2->lqi_type)
851 return (q1->lqi_id.qid_uid == q2->lqi_id.qid_uid) ? true : false;
855 * Enforce quota, it's called in the declaration of each operation.
856 * qsd_op_end() will then be called later once all the operations have been
857 * completed in order to release/adjust the quota space.
859 * \param env - the environment passed by the caller
860 * \param qsd - is the qsd instance associated with the device in charge of
862 * \param trans - is the quota transaction information
863 * \param qi - qid & space required by current operation
864 * \param flags - if the operation is write, return caller no user/group and
867 * \retval 0 - success
868 * \retval -EDQUOT - out of quota
869 * \retval -EINPROGRESS - inform client to retry write
870 * \retval -ve - other appropriate errors
872 int qsd_op_begin(const struct lu_env *env, struct qsd_instance *qsd,
873 struct lquota_trans *trans, struct lquota_id_info *qi,
874 enum osd_quota_local_flags *local_flags)
880 /* fast path, ignore quota enforcement request for root owned files */
881 if (qi->lqi_id.qid_uid == 0)
884 if (unlikely(qsd == NULL))
887 if (qsd->qsd_dev->dd_rdonly)
890 /* We don't enforce quota until the qsd_instance is started */
891 read_lock(&qsd->qsd_lock);
892 if (!qsd->qsd_started) {
893 read_unlock(&qsd->qsd_lock);
896 read_unlock(&qsd->qsd_lock);
898 /* ignore block quota on MDTs, ignore inode quota on OSTs */
899 if ((!qsd->qsd_is_md && !qi->lqi_is_blk) ||
900 (qsd->qsd_is_md && qi->lqi_is_blk))
903 /* ignore quota enforcement request when:
904 * - quota isn't enforced for this quota type
905 * or - the user/group is root
906 * or - quota accounting isn't enabled */
907 if (!qsd_type_enabled(qsd, qi->lqi_type) ||
908 (qsd->qsd_type_array[qi->lqi_type])->qqi_acct_failed)
911 if (local_flags && qi->lqi_id.qid_projid && qsd->qsd_root_prj_enable)
912 *local_flags |= QUOTA_FL_ROOT_PRJQUOTA;
914 LASSERTF(trans->lqt_id_cnt <= QUOTA_MAX_TRANSIDS, "id_cnt=%d\n",
916 /* check whether we already allocated a slot for this id */
917 for (i = 0; i < trans->lqt_id_cnt; i++) {
918 if (qid_equal(qi, &trans->lqt_ids[i])) {
925 if (unlikely(i >= QUOTA_MAX_TRANSIDS)) {
926 CERROR("%s: more than %d qids enforced for a "
927 "transaction?\n", qsd->qsd_svname, i);
932 trans->lqt_ids[i].lqi_id = qi->lqi_id;
933 trans->lqt_ids[i].lqi_type = qi->lqi_type;
934 trans->lqt_ids[i].lqi_is_blk = qi->lqi_is_blk;
938 /* manage quota enforcement for this ID */
939 rc = qsd_op_begin0(env, qsd->qsd_type_array[qi->lqi_type],
940 &trans->lqt_ids[i], qi->lqi_space, local_flags);
943 EXPORT_SYMBOL(qsd_op_begin);
946 * Adjust quota space (by acquiring or releasing) hold by the quota slave.
947 * This function is called after each quota request completion and during
948 * reintegration in order to report usage or re-acquire quota locks.
949 * Space adjustment is aborted if there is already a quota request in flight
952 * \param env - the environment passed by the caller
953 * \param lqe - is the qid entry to be processed
955 * \retval 0 on success, appropriate errors on failure
957 int qsd_adjust(const struct lu_env *env, struct lquota_entry *lqe)
959 struct qsd_thread_info *qti = qsd_info(env);
960 struct quota_body *qbody = &qti->qti_body;
961 struct qsd_instance *qsd;
962 struct qsd_qtype_info *qqi;
967 memset(qbody, 0, sizeof(*qbody));
968 rc = qsd_ready(lqe, &qbody->qb_glb_lockh);
970 /* add to adjust list again to trigger adjustment later when
972 LQUOTA_DEBUG(lqe, "delaying adjustment since qsd isn't ready");
973 qsd_adjust_schedule(lqe, true, false);
980 if (qsd->qsd_dev->dd_rdonly)
985 /* fill qb_count & qb_flags */
986 if (!qsd_calc_adjust(lqe, qbody)) {
987 lqe_write_unlock(lqe);
988 LQUOTA_DEBUG(lqe, "no adjustment required");
992 /* only 1 quota request in flight for a given ID is allowed */
993 rc = qsd_request_enter(lqe);
995 /* already a request in flight, space adjustment will be run
996 * again on request completion */
997 lqe_write_unlock(lqe);
1001 if (req_is_rel(qbody->qb_flags))
1002 lqe->lqe_pending_rel = qbody->qb_count;
1003 lustre_handle_copy(&qti->qti_lockh, &lqe->lqe_lockh);
1004 lqe_write_unlock(lqe);
1006 /* hold a refcount until completion */
1009 /* fill other quota body fields */
1010 qbody->qb_fid = qqi->qqi_fid;
1011 qbody->qb_id = lqe->lqe_id;
1013 if (req_is_acq(qbody->qb_flags) || req_is_preacq(qbody->qb_flags)) {
1014 /* check whether we own a valid lock for this ID */
1015 rc = qsd_id_lock_match(&qti->qti_lockh, &qbody->qb_lockh);
1017 memset(&qti->qti_lockh, 0, sizeof(qti->qti_lockh));
1018 if (req_is_preacq(qbody->qb_flags)) {
1019 if (req_has_rep(qbody->qb_flags))
1020 /* still want to report usage */
1021 qbody->qb_flags = QUOTA_DQACQ_FL_REPORT;
1023 /* no pre-acquire if no per-ID lock */
1024 GOTO(out, rc = -ENOLCK);
1026 /* no lock found, should use intent */
1029 } else if (req_is_acq(qbody->qb_flags) &&
1030 qbody->qb_count == 0) {
1031 /* found cached lock, no need to acquire */
1035 /* release and report don't need a per-ID lock */
1036 memset(&qti->qti_lockh, 0, sizeof(qti->qti_lockh));
1040 rc = qsd_send_dqacq(env, qsd->qsd_exp, qbody, false,
1041 qsd_req_completion, qqi, &qti->qti_lockh,
1044 struct lquota_lvb *lvb;
1048 GOTO(out, rc = -ENOMEM);
1050 rc = qsd_intent_lock(env, qsd->qsd_exp, qbody, false,
1051 IT_QUOTA_DQACQ, qsd_req_completion,
1052 qqi, lvb, (void *)lqe);
1054 /* the completion function will be called by qsd_send_dqacq or
1055 * qsd_intent_lock */
1058 qsd_req_completion(env, qqi, qbody, NULL, &qti->qti_lockh, NULL, lqe,
1064 * Post quota operation, pre-acquire/release quota from master.
1066 * \param env - the environment passed by the caller
1067 * \param qsd - is the qsd instance attached to the OSD device which
1068 * is handling the operation.
1069 * \param qqi - is the qsd_qtype_info structure associated with the quota ID
1070 * subject to the operation
1071 * \param qid - stores information related to his ID for the operation
1072 * which has just completed
1074 * \retval 0 - success
1075 * \retval -ve - failure
1077 static void qsd_op_end0(const struct lu_env *env, struct qsd_qtype_info *qqi,
1078 struct lquota_id_info *qid)
1080 struct lquota_entry *lqe;
1084 lqe = qid->lqi_qentry;
1087 qid->lqi_qentry = NULL;
1089 /* refresh cached usage if a suitable environment is passed */
1091 qsd_refresh_usage(env, lqe);
1093 lqe_write_lock(lqe);
1094 if (qid->lqi_space > 0) {
1095 if (lqe->lqe_pending_write < qid->lqi_space) {
1097 "More pending write quota to reduce (pending %llu, space %lld)\n",
1098 lqe->lqe_pending_write, qid->lqi_space);
1099 lqe->lqe_pending_write = 0;
1101 lqe->lqe_pending_write -= qid->lqi_space;
1105 adjust = qsd_adjust_needed(lqe);
1108 lqe_write_unlock(lqe);
1111 /* pre-acquire/release quota space is needed */
1113 qsd_adjust(env, lqe);
1115 /* no suitable environment, handle adjustment in
1116 * separate thread context */
1117 qsd_adjust_schedule(lqe, false, false);
1124 * Post quota operation. It's called after each operation transaction stopped.
1126 * \param env - the environment passed by the caller
1127 * \param qsd - is the qsd instance associated with device which is handling
1129 * \param qids - all qids information attached in the transaction handle
1130 * \param count - is the number of qid entries in the qids array.
1132 * \retval 0 - success
1133 * \retval -ve - failure
1135 void qsd_op_end(const struct lu_env *env, struct qsd_instance *qsd,
1136 struct lquota_trans *trans)
1141 if (unlikely(qsd == NULL))
1144 if (qsd->qsd_dev->dd_rdonly)
1147 /* We don't enforce quota until the qsd_instance is started */
1148 read_lock(&qsd->qsd_lock);
1149 if (!qsd->qsd_started) {
1150 read_unlock(&qsd->qsd_lock);
1153 read_unlock(&qsd->qsd_lock);
1155 LASSERT(trans != NULL);
1157 for (i = 0; i < trans->lqt_id_cnt; i++) {
1158 struct qsd_qtype_info *qqi;
1160 if (trans->lqt_ids[i].lqi_qentry == NULL)
1163 qqi = qsd->qsd_type_array[trans->lqt_ids[i].lqi_type];
1164 qsd_op_end0(env, qqi, &trans->lqt_ids[i]);
1167 /* reset id_count to 0 so that a second accidental call to qsd_op_end()
1168 * does not result in failure */
1169 trans->lqt_id_cnt = 0;
1172 EXPORT_SYMBOL(qsd_op_end);
1174 /* Simple wrapper on top of qsd API which implement quota transfer for osd
1175 * setattr needs. As a reminder, only the root user can change ownership of
1176 * a file, that's why EDQUOT & EINPROGRESS errors are discarded
1178 int qsd_transfer(const struct lu_env *env, struct qsd_instance *qsd,
1179 struct lquota_trans *trans, unsigned int qtype,
1180 u64 orig_id, u64 new_id, u64 bspace,
1181 struct lquota_id_info *qi)
1188 LASSERT(qtype < LL_MAXQUOTAS);
1189 if (qtype == PRJQUOTA)
1190 if (!projid_valid(make_kprojid(&init_user_ns, new_id)))
1193 qi->lqi_type = qtype;
1195 /* inode accounting */
1196 qi->lqi_is_blk = false;
1198 /* one more inode for the new owner ... */
1199 qi->lqi_id.qid_uid = new_id;
1201 rc = qsd_op_begin(env, qsd, trans, qi, NULL);
1202 if (rc == -EDQUOT || rc == -EINPROGRESS)
1207 /* and one less inode for the current id */
1208 qi->lqi_id.qid_uid = orig_id;
1210 /* can't get EDQUOT when reducing usage */
1211 rc = qsd_op_begin(env, qsd, trans, qi, NULL);
1212 if (rc == -EINPROGRESS)
1217 /* block accounting */
1218 qi->lqi_is_blk = true;
1220 /* more blocks for the new owner ... */
1221 qi->lqi_id.qid_uid = new_id;
1222 qi->lqi_space = bspace;
1223 rc = qsd_op_begin(env, qsd, trans, qi, NULL);
1224 if (rc == -EDQUOT || rc == -EINPROGRESS)
1229 /* and finally less blocks for the current owner */
1230 qi->lqi_id.qid_uid = orig_id;
1231 qi->lqi_space = -bspace;
1232 rc = qsd_op_begin(env, qsd, trans, qi, NULL);
1233 /* can't get EDQUOT when reducing usage */
1234 if (rc == -EINPROGRESS)
1238 EXPORT_SYMBOL(qsd_transfer);
1241 * Trigger pre-acquire/release if necessary.
1242 * It's only used by ldiskfs osd so far. When unlink a file in ldiskfs, the
1243 * quota accounting isn't updated when the transaction stopped. Instead, it'll
1244 * be updated on the final iput, so qsd_op_adjust() will be called then (in
1245 * osd_object_delete()) to trigger quota release if necessary.
1247 * \param env - the environment passed by the caller
1248 * \param qsd - is the qsd instance associated with the device in charge
1250 * \param qid - is the lquota ID of the user/group for which to trigger
1251 * quota space adjustment
1252 * \param qtype - is the quota type (USRQUOTA or GRPQUOTA)
1254 void qsd_op_adjust(const struct lu_env *env, struct qsd_instance *qsd,
1255 union lquota_id *qid, int qtype)
1257 struct lquota_entry *lqe;
1258 struct qsd_qtype_info *qqi;
1262 if (unlikely(qsd == NULL))
1265 /* We don't enforce quota until the qsd_instance is started */
1266 read_lock(&qsd->qsd_lock);
1267 if (!qsd->qsd_started) {
1268 read_unlock(&qsd->qsd_lock);
1271 read_unlock(&qsd->qsd_lock);
1273 qqi = qsd->qsd_type_array[qtype];
1276 if (!qsd_type_enabled(qsd, qtype) || qqi->qqi_acct_obj == NULL ||
1277 qid->qid_uid == 0 || qsd->qsd_dev->dd_rdonly)
1280 read_lock(&qsd->qsd_lock);
1281 if (!qsd->qsd_started) {
1282 read_unlock(&qsd->qsd_lock);
1285 read_unlock(&qsd->qsd_lock);
1287 lqe = lqe_locate(env, qqi->qqi_site, qid);
1289 CERROR("%s: fail to locate lqe for id:%llu, type:%d\n",
1290 qsd->qsd_svname, qid->qid_uid, qtype);
1294 qsd_refresh_usage(env, lqe);
1297 adjust = qsd_adjust_needed(lqe);
1298 lqe_read_unlock(lqe);
1301 qsd_adjust(env, lqe);
1306 EXPORT_SYMBOL(qsd_op_adjust);
1309 * Reserve or free quota.
1311 * Currently, It's used to reserve quota space before changing the file's group
1312 * for normal user and free the reserved quota after the group change.
1314 * \param env - the environment passed by the caller
1315 * \param qsd - is the qsd instance associated with the device in charge of
1317 * \param qi - qid & space required by current operation
1319 * \retval 0 - success
1320 * \retval -EDQUOT - out of quota
1321 * \retval -EINPROGRESS - inform client to retry write
1322 * \retval -ve - other appropriate errors
1324 int qsd_reserve_or_free_quota(const struct lu_env *env,
1325 struct qsd_instance *qsd,
1326 struct lquota_id_info *qi)
1328 struct qsd_qtype_info *qqi;
1329 bool is_free = qi->lqi_space < 0;
1334 if (unlikely(qsd == NULL))
1337 if (qsd->qsd_dev->dd_rdonly)
1341 qi->lqi_space *= -1;
1343 /* We don't enforce quota until the qsd_instance is started */
1344 read_lock(&qsd->qsd_lock);
1345 if (!qsd->qsd_started) {
1346 read_unlock(&qsd->qsd_lock);
1349 read_unlock(&qsd->qsd_lock);
1351 qqi = qsd->qsd_type_array[qi->lqi_type];
1354 CDEBUG(D_QUOTA, "type %s, acct %s, free %d, count %llu\n",
1355 qsd_type_enabled(qsd, qi->lqi_type) ? "enabled" : "disabled",
1356 (qsd->qsd_type_array[qi->lqi_type])->qqi_acct_failed ? "failed" :
1357 "succeed", is_free, qi->lqi_space);
1359 /* ignore quota enforcement request when:
1360 * - quota isn't enforced for this quota type
1361 * or - the user/group is root
1362 * or - quota accounting isn't enabled
1365 (!qsd_type_enabled(qsd, qi->lqi_type) || qi->lqi_id.qid_uid == 0 ||
1366 (qsd->qsd_type_array[qi->lqi_type])->qqi_acct_failed))
1370 qsd_op_end0(env, qsd->qsd_type_array[qi->lqi_type], qi);
1372 long long qspace = qi->lqi_space;
1374 /* the acquired quota will add to lqi_space in qsd_op_begin0 */
1376 rc = qsd_op_begin0(env, qsd->qsd_type_array[qi->lqi_type], qi,
1378 if (rc && qi->lqi_qentry) {
1379 lqe_putref(qi->lqi_qentry);
1380 qi->lqi_qentry = NULL;
1384 CDEBUG(D_QUOTA, "%s quota: type %i, uid %llu, gid %llu, space %llu\n",
1385 is_free ? "Free" : "Reserve", qi->lqi_type, qi->lqi_id.qid_uid,
1386 qi->lqi_id.qid_gid, qi->lqi_space);
1390 EXPORT_SYMBOL(qsd_reserve_or_free_quota);