Whamcloud - gitweb
LU-17744 ldiskfs: mballoc stats fixes
[fs/lustre-release.git] / lustre / quota / qsd_handler.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 021110-1307, USA
20  *
21  * GPL HEADER END
22  */
23 /*
24  * Copyright (c) 2012, 2017, Intel Corporation.
25  * Use is subject to license terms.
26  *
27  * Author: Johann Lombardi <johann.lombardi@intel.com>
28  * Author: Niu    Yawei    <yawei.niu@intel.com>
29  */
30
31 #define DEBUG_SUBSYSTEM S_LQUOTA
32
33 #include "qsd_internal.h"
34
35 /**
36  * helper function bumping lqe_pending_req if there is no quota request in
37  * flight for the lquota entry \a lqe. Otherwise, EBUSY is returned.
38  */
39 static inline int qsd_request_enter(struct lquota_entry *lqe)
40 {
41         /* is there already a quota request in flight? */
42         if (lqe->lqe_pending_req != 0) {
43                 LQUOTA_DEBUG(lqe, "already a request in flight");
44                 return -EBUSY;
45         }
46
47         if (lqe->lqe_pending_rel != 0) {
48                 LQUOTA_ERROR(lqe, "no request in flight with pending_rel=%llu",
49                              lqe->lqe_pending_rel);
50                 LBUG();
51         }
52
53         lqe->lqe_pending_req++;
54         return 0;
55 }
56
57 /**
58  * Companion of qsd_request_enter() dropping lqe_pending_req to 0.
59  */
60 static inline void qsd_request_exit(struct lquota_entry *lqe)
61 {
62         if (lqe->lqe_pending_req != 1) {
63                 LQUOTA_ERROR(lqe, "lqe_pending_req != 1!!!");
64                 LBUG();
65         }
66         lqe->lqe_pending_req--;
67         lqe->lqe_pending_rel = 0;
68         wake_up(&lqe->lqe_waiters);
69 }
70
71 /**
72  * Check whether a qsd instance is all set to send quota request to master.
73  * This includes checking whether:
74  * - the connection to master is set up and usable,
75  * - the qsd isn't stopping
76  * - reintegration has been successfully completed and all indexes are
77  *   up-to-date
78  *
79  * \param lqe - is the lquota entry for which we would like to send an quota
80  *              request
81  * \param lockh - is the remote handle of the global lock returned on success
82  *
83  * \retval 0 on success, appropriate error on failure
84  */
85 static int qsd_ready(struct lquota_entry *lqe, struct lustre_handle *lockh)
86 {
87         struct qsd_qtype_info   *qqi = lqe2qqi(lqe);
88         struct qsd_instance     *qsd = qqi->qqi_qsd;
89         struct obd_import       *imp = NULL;
90         struct ldlm_lock        *lock;
91         ENTRY;
92
93         read_lock(&qsd->qsd_lock);
94         /* is the qsd about to shut down? */
95         if (qsd->qsd_stopping) {
96                 read_unlock(&qsd->qsd_lock);
97                 LQUOTA_DEBUG(lqe, "dropping quota req since qsd is stopping");
98                 /* Target is about to shut down, client will retry */
99                 RETURN(-EINPROGRESS);
100         }
101
102         /* is the connection to the quota master ready? */
103         if (qsd->qsd_exp_valid)
104                 imp = class_exp2cliimp(qsd->qsd_exp);
105         if (imp == NULL || imp->imp_invalid) {
106                 read_unlock(&qsd->qsd_lock);
107                 LQUOTA_DEBUG(lqe, "connection to master not ready");
108                 RETURN(-ENOTCONN);
109         }
110
111         /* In most case, reintegration must have been triggered (when enable
112          * quota or on OST start), however, in rare race condition (enabling
113          * quota when starting OSTs), we might miss triggering reintegration
114          * for some qqi.
115          *
116          * If the previous reintegration failed for some reason, we'll
117          * re-trigger it here as well. */
118         if (!qqi->qqi_glb_uptodate || !qqi->qqi_slv_uptodate) {
119                 read_unlock(&qsd->qsd_lock);
120                 LQUOTA_DEBUG(lqe, "not up-to-date, dropping request and "
121                              "kicking off reintegration");
122                 qsd_start_reint_thread(qqi);
123                 RETURN(-EINPROGRESS);
124         }
125
126         /* Fill the remote global lock handle, master will check this handle
127          * to see if the slave is sending request with stale lock */
128         lustre_handle_copy(lockh, &qqi->qqi_lockh);
129         read_unlock(&qsd->qsd_lock);
130
131         if (!lustre_handle_is_used(lockh))
132                 RETURN(-ENOLCK);
133
134         lock = ldlm_handle2lock(lockh);
135         if (lock == NULL)
136                 RETURN(-ENOLCK);
137
138         /* return remote lock handle to be packed in quota request */
139         lustre_handle_copy(lockh, &lock->l_remote_handle);
140         LDLM_LOCK_PUT(lock);
141
142         RETURN(0);
143 }
144
145 /**
146  * Check whether any quota space adjustment (pre-acquire/release/report) is
147  * needed for a given quota ID. If a non-null \a qbody is passed, then the
148  * \a qbody structure (qb_count/flags/usage) is filled with appropriate data
149  * to be packed in the quota request.
150  *
151  * \param lqe   - is the lquota entry for which we would like to adjust quota
152  *                space.
153  * \param qbody - is the quota body to fill, if not NULL.
154  *
155  * \retval true  - space adjustment is required and \a qbody is filled, if not
156  *                 NULL
157  * \retval false - no space adjustment required
158  */
159 static bool qsd_calc_adjust(struct lquota_entry *lqe, struct quota_body *qbody)
160 {
161         __u64   usage, granted;
162         ENTRY;
163
164         usage   = lqe->lqe_usage;
165         usage  += lqe->lqe_pending_write + lqe->lqe_waiting_write;
166         granted = lqe->lqe_granted;
167
168         if (qbody != NULL)
169                 qbody->qb_flags = 0;
170
171         if (!lqe->lqe_enforced) {
172                 /* quota not enforced any more for this ID */
173                 if (granted != 0) {
174                         /* release all quota space unconditionally */
175                         LQUOTA_DEBUG(lqe, "not enforced, releasing all space");
176                         if (qbody != NULL) {
177                                 qbody->qb_count = granted;
178                                 qbody->qb_flags = QUOTA_DQACQ_FL_REL;
179                         }
180                         RETURN(true);
181                 }
182                 RETURN(false);
183         }
184
185         if (!lustre_handle_is_used(&lqe->lqe_lockh)) {
186                 /* No valid per-ID lock
187                  * When reporting quota (during reintegration or on setquota
188                  * glimpse), we should release granted space if usage is 0.
189                  * Otherwise, if the usage is less than granted, we need to
190                  * acquire the per-ID lock to make sure the unused grant can be
191                  * reclaimed by per-ID lock glimpse. */
192                 if (usage == 0) {
193                         /* no on-disk usage and no outstanding activity, release
194                          * space */
195                         if (granted != 0) {
196                                 LQUOTA_DEBUG(lqe, "no usage, releasing all "
197                                              "space");
198                                 if (qbody != NULL) {
199                                         qbody->qb_count = granted;
200                                         qbody->qb_flags = QUOTA_DQACQ_FL_REL;
201                                 }
202                                 RETURN(true);
203                         }
204                         LQUOTA_DEBUG(lqe, "no usage + no granted, nothing to "
205                                      "do");
206                         RETURN(false);
207                 }
208
209                 if (lqe->lqe_usage < lqe->lqe_granted) {
210                         /* holding quota space w/o any lock, enqueue per-ID lock
211                          * again */
212                         LQUOTA_DEBUG(lqe, "(re)acquiring per-ID lock");
213                         if (qbody != NULL) {
214                                 qbody->qb_count = 0;
215                                 qbody->qb_flags = QUOTA_DQACQ_FL_ACQ;
216                         }
217                         RETURN(true);
218                 }
219
220                 if (lqe->lqe_usage > lqe->lqe_granted) {
221                         /* quota overrun, report usage */
222                         LQUOTA_DEBUG(lqe, "overrun, reporting usage");
223                         if (qbody != NULL) {
224                                 qbody->qb_usage = lqe->lqe_usage;
225                                 qbody->qb_flags = QUOTA_DQACQ_FL_REPORT;
226                         }
227                         RETURN(true);
228                 }
229                 LQUOTA_DEBUG(lqe, "granted matches usage, nothing to do");
230                 RETURN(false);
231         }
232
233         /* valid per-ID lock
234          * Apply good old quota qunit adjustment logic which has been around
235          * since lustre 1.4:
236          * 1. revoke all extra grant
237          */
238         if (lqe->lqe_revoke) {
239                 if (qbody == NULL)
240                         RETURN(true);
241
242                 lqe->lqe_revoke = 0;
243
244                 LQUOTA_DEBUG(lqe, "revoke pre-acquired quota: %llu - %llu\n",
245                              granted, usage);
246                 qbody->qb_count = granted - usage;
247                 qbody->qb_flags = QUOTA_DQACQ_FL_REL;
248                 RETURN(true);
249         }
250
251         /* 2. release spare quota space? */
252         if (granted > usage + lqe->lqe_qunit) {
253                 /* pre-release quota space */
254                 if (qbody == NULL)
255                         RETURN(true);
256                 qbody->qb_count = granted - usage;
257                 /* if usage == 0, release all granted space */
258                 if (usage) {
259                         /* try to keep one qunit of quota space */
260                         qbody->qb_count -= lqe->lqe_qunit;
261                         /* but don't release less than qtune to avoid releasing
262                          * space too often */
263                         if (qbody->qb_count < lqe->lqe_qtune)
264                                 qbody->qb_count = lqe->lqe_qtune;
265                 }
266                 qbody->qb_flags = QUOTA_DQACQ_FL_REL;
267                 RETURN(true);
268         }
269
270         /* 3. Any quota overrun? */
271         if (lqe->lqe_usage > lqe->lqe_granted) {
272                 /* we overconsumed quota space, we report usage in request so
273                  * that master can adjust it unconditionally */
274                 if (qbody == NULL)
275                         RETURN(true);
276                 qbody->qb_usage = lqe->lqe_usage;
277                 granted         = lqe->lqe_usage;
278                 qbody->qb_flags = QUOTA_DQACQ_FL_REPORT;
279         }
280
281         /* 4. Time to pre-acquire? */
282         if (!lqe->lqe_edquot && !lqe->lqe_nopreacq && usage > 0 &&
283             lqe->lqe_qunit != 0 && granted < usage + lqe->lqe_qtune) {
284                 /* To pre-acquire quota space, we report how much spare quota
285                  * space the slave currently owns, then the master will grant us
286                  * back how much we can pretend given the current state of
287                  * affairs */
288                 if (qbody == NULL)
289                         RETURN(true);
290                 if (granted <= usage)
291                         qbody->qb_count = 0;
292                 else
293                         qbody->qb_count = granted - usage;
294                 qbody->qb_flags |= QUOTA_DQACQ_FL_PREACQ;
295                 RETURN(true);
296         }
297
298         if (qbody != NULL)
299                 RETURN(qbody->qb_flags != 0);
300         else
301                 RETURN(false);
302 }
303
304 /**
305  * Helper function returning true when quota space need to be adjusted (some
306  * unused space should be free or pre-acquire) and false otherwise.
307  */
308 static inline bool qsd_adjust_needed(struct lquota_entry *lqe)
309 {
310         return qsd_calc_adjust(lqe, NULL);
311 }
312
313 /**
314  * Callback function called when an acquire/release request sent to the master
315  * is completed
316  */
317 static void qsd_req_completion(const struct lu_env *env,
318                                struct qsd_qtype_info *qqi,
319                                struct quota_body *reqbody,
320                                struct quota_body *repbody,
321                                struct lustre_handle *lockh,
322                                struct lquota_lvb *lvb,
323                                void *arg, int ret)
324 {
325         struct lquota_entry     *lqe = (struct lquota_entry *)arg;
326         struct qsd_thread_info  *qti;
327         int                      rc;
328         bool                     adjust = false, cancel = false;
329         ENTRY;
330
331         LASSERT(qqi != NULL && lqe != NULL);
332
333         /* environment passed by ptlrpcd is mostly used by CLIO and hasn't the
334          * DT tags set. */
335         rc = lu_env_refill_by_tags((struct lu_env *)env, LCT_DT_THREAD, 0);
336         if (rc) {
337                 LQUOTA_ERROR(lqe, "failed to refill environmnent %d", rc);
338                 lqe_write_lock(lqe);
339                 /* can't afford to adjust quota space with no suitable lu_env */
340                 GOTO(out_noadjust, rc);
341         }
342         qti = qsd_info(env);
343
344         lqe_write_lock(lqe);
345         LQUOTA_DEBUG(lqe, "DQACQ returned %d, flags:0x%x", ret,
346                      reqbody->qb_flags);
347
348         /* despite -EDQUOT & -EINPROGRESS errors, the master might still
349          * grant us back quota space to adjust quota overrun */
350         if (ret != 0 && ret != -EDQUOT && ret != -EINPROGRESS) {
351                 if (ret != -ETIMEDOUT && ret != -ENOTCONN &&
352                    ret != -ESHUTDOWN && ret != -EAGAIN)
353                         /* print errors only if return code is unexpected */
354                         LQUOTA_ERROR(lqe, "DQACQ failed with %d, flags:0x%x",
355                                      ret, reqbody->qb_flags);
356                 GOTO(out, ret);
357         }
358
359         /* Set the lqe_lockh */
360         if (lustre_handle_is_used(lockh) &&
361             !lustre_handle_equal(lockh, &lqe->lqe_lockh))
362                 lustre_handle_copy(&lqe->lqe_lockh, lockh);
363
364         /* If the replied qb_count is zero, it means master didn't process
365          * the DQACQ since the limit for this ID has been removed, so we
366          * should not update quota entry & slave index copy neither. */
367         if (repbody != NULL && repbody->qb_count != 0) {
368                 LQUOTA_DEBUG(lqe, "DQACQ qb_count:%llu", repbody->qb_count);
369
370                 if (lqe->lqe_is_reset) {
371                         lqe->lqe_granted = 0;
372                 } else if (req_is_rel(reqbody->qb_flags)) {
373                         if (lqe->lqe_granted < repbody->qb_count) {
374                                 LQUOTA_ERROR(lqe, "can't release more space "
375                                              "than owned %llu<%llu",
376                                              lqe->lqe_granted,
377                                              repbody->qb_count);
378                                 lqe->lqe_granted = 0;
379                         } else {
380                                 lqe->lqe_granted -= repbody->qb_count;
381                         }
382                         /* Cancel the per-ID lock initiatively when there
383                          * isn't any usage & grant, which can avoid master
384                          * sending glimpse unnecessarily to this slave on
385                          * quota revoking */
386                         if (!lqe->lqe_pending_write && !lqe->lqe_granted &&
387                             !lqe->lqe_waiting_write && !lqe->lqe_usage)
388                                 cancel = true;
389                 } else {
390                         lqe->lqe_granted += repbody->qb_count;
391                 }
392                 qti->qti_rec.lqr_slv_rec.qsr_granted = lqe->lqe_granted;
393                 lqe_write_unlock(lqe);
394
395                 /* Update the slave index file in the dedicated thread. So far,
396                  * We don't update the version of slave index copy on DQACQ.
397                  * No locking is necessary since nobody can change
398                  * lqe->lqe_granted while lqe->lqe_pending_req > 0 */
399                 if (CFS_FAIL_CHECK(OBD_FAIL_QUOTA_GRANT))
400                         qti->qti_rec.lqr_slv_rec.qsr_granted =
401                                                         0xFFFFFFFFFFDEC80CULL;
402                 qsd_upd_schedule(qqi, lqe, &lqe->lqe_id, &qti->qti_rec, 0,
403                                  false);
404                 lqe_write_lock(lqe);
405         }
406
407         /* extract information from lvb */
408         if (ret == 0 && lvb != NULL) {
409                 if (lvb->lvb_id_qunit != 0)
410                         qsd_set_qunit(lqe, lvb->lvb_id_qunit);
411                 qsd_set_edquot(lqe, !!(lvb->lvb_flags & LQUOTA_FL_EDQUOT));
412         } else if (repbody != NULL && repbody->qb_qunit != 0) {
413                 qsd_set_qunit(lqe, repbody->qb_qunit);
414         }
415
416         /* turn off pre-acquire if it failed with -EDQUOT. This is done to avoid
417          * flooding the master with acquire request. Pre-acquire will be turned
418          * on again as soon as qunit is modified */
419         if (req_is_preacq(reqbody->qb_flags) && ret == -EDQUOT)
420                 lqe->lqe_nopreacq = true;
421 out:
422         adjust = qsd_adjust_needed(lqe);
423         if (reqbody && req_is_acq(reqbody->qb_flags) && ret != -EDQUOT) {
424                 lqe->lqe_acq_rc = ret;
425                 lqe->lqe_acq_time = ktime_get_seconds();
426         }
427 out_noadjust:
428         qsd_request_exit(lqe);
429         lqe_write_unlock(lqe);
430
431         /* release reference on per-ID lock */
432         if (lustre_handle_is_used(lockh))
433                 ldlm_lock_decref(lockh, qsd_id_einfo.ei_mode);
434
435         if (cancel) {
436                 qsd_adjust_schedule(lqe, false, true);
437         } else if (adjust) {
438                 if (!ret || ret == -EDQUOT)
439                         qsd_adjust_schedule(lqe, false, false);
440                 else
441                         qsd_adjust_schedule(lqe, true, false);
442         }
443         lqe_putref(lqe);
444
445         if (lvb)
446                 OBD_FREE_PTR(lvb);
447         EXIT;
448 }
449
450 /**
451  * Try to consume local quota space.
452  *
453  * \param lqe   - is the qid entry to be processed
454  * \param space - is the amount of quota space needed to complete the operation
455  *
456  * \retval 0       - success
457  * \retval -EDQUOT - out of quota
458  * \retval -EAGAIN - need to acquire space from master
459  */
460 static int qsd_acquire_local(struct lquota_entry *lqe, __u64 space)
461 {
462         __u64   usage;
463         int     rc;
464         ENTRY;
465
466         if (!lqe->lqe_enforced)
467                 /* not enforced any more, we are good */
468                 RETURN(-ESRCH);
469
470         lqe_write_lock(lqe);
471         /* use latest usage */
472         usage = lqe->lqe_usage;
473         /* take pending write into account */
474         usage += lqe->lqe_pending_write;
475
476         if (space + usage <= lqe->lqe_granted - lqe->lqe_pending_rel) {
477                 /* Yay! we got enough space */
478                 lqe->lqe_pending_write += space;
479                 lqe->lqe_waiting_write -= space;
480                 rc = 0;
481         /* lqe_edquot flag is used to avoid flooding dqacq requests when
482          * the user is over quota, however, the lqe_edquot could be stale
483          * sometimes due to the race reply of dqacq vs. id lock glimpse
484          * (see LU-4505), so we revalidate it every 5 seconds. */
485         } else if (lqe->lqe_edquot &&
486                    (lqe->lqe_edquot_time > ktime_get_seconds() - 5)) {
487                 rc = -EDQUOT;
488         }else {
489                 rc = -EAGAIN;
490         }
491         lqe_write_unlock(lqe);
492
493         RETURN(rc);
494 }
495
496 /**
497  * Compute how much quota space should be acquire from the master based
498  * on how much is currently granted to this slave and pending/waiting
499  * operations.
500  *
501  * \param lqe - is the lquota entry for which we would like to adjust quota
502  *              space.
503  * \param qbody - is the quota body of the acquire request to fill
504  *
505  * \retval true  - space acquisition is needed and qbody is filled
506  * \retval false - no space acquisition required
507  */
508 static inline bool qsd_calc_acquire(struct lquota_entry *lqe,
509                                     struct quota_body *qbody)
510 {
511         __u64   usage, granted;
512
513         usage   = lqe->lqe_usage;
514         usage  += lqe->lqe_pending_write + lqe->lqe_waiting_write;
515         granted = lqe->lqe_granted;
516
517         qbody->qb_flags = 0;
518
519         /* if we overconsumed quota space, we report usage in request so that
520          * master can adjust it unconditionally */
521         if (lqe->lqe_usage > lqe->lqe_granted) {
522                 qbody->qb_usage = lqe->lqe_usage;
523                 qbody->qb_flags = QUOTA_DQACQ_FL_REPORT;
524                 granted = lqe->lqe_usage;
525         }
526
527         /* acquire as much as needed, but not more */
528         if (usage > granted) {
529                 qbody->qb_count  = usage - granted;
530                 qbody->qb_flags |= QUOTA_DQACQ_FL_ACQ;
531         }
532
533         return qbody->qb_flags != 0;
534 }
535
536 /**
537  * Acquire quota space from master.
538  * There are at most 1 in-flight dqacq/dqrel.
539  *
540  * \param env    - the environment passed by the caller
541  * \param lqe    - is the qid entry to be processed
542  *
543  * \retval 0            - success
544  * \retval -EDQUOT      - out of quota
545  * \retval -EINPROGRESS - inform client to retry write/create
546  * \retval -EBUSY       - already a quota request in flight
547  * \retval -ve          - other appropriate errors
548  */
549 static int qsd_acquire_remote(const struct lu_env *env,
550                               struct lquota_entry *lqe)
551 {
552         struct qsd_thread_info  *qti = qsd_info(env);
553         struct quota_body       *qbody = &qti->qti_body;
554         struct qsd_instance     *qsd;
555         struct qsd_qtype_info   *qqi;
556         int                      rc;
557         ENTRY;
558
559         memset(qbody, 0, sizeof(*qbody));
560         rc = qsd_ready(lqe, &qbody->qb_glb_lockh);
561         if (rc)
562                 RETURN(rc);
563
564         qqi = lqe2qqi(lqe);
565         qsd = qqi->qqi_qsd;
566
567         lqe_write_lock(lqe);
568
569         /* is quota really enforced for this id? */
570         if (!lqe->lqe_enforced) {
571                 lqe_write_unlock(lqe);
572                 LQUOTA_DEBUG(lqe, "quota not enforced any more");
573                 RETURN(0);
574         }
575
576         /* fill qb_count & qb_flags */
577         if (!qsd_calc_acquire(lqe, qbody)) {
578                 lqe_write_unlock(lqe);
579                 LQUOTA_DEBUG(lqe, "No acquire required");
580                 RETURN(0);
581         }
582
583         /* check whether an acquire request completed recently */
584         if (lqe->lqe_acq_rc != 0 &&
585             lqe->lqe_acq_time > ktime_get_seconds() - 1) {
586                 lqe_write_unlock(lqe);
587                 LQUOTA_DEBUG(lqe, "using cached return code %d", lqe->lqe_acq_rc);
588                 RETURN(lqe->lqe_acq_rc);
589         }
590
591         /* only 1 quota request in flight for a given ID is allowed */
592         rc = qsd_request_enter(lqe);
593         if (rc) {
594                 lqe_write_unlock(lqe);
595                 RETURN(rc);
596         }
597
598         lustre_handle_copy(&qti->qti_lockh, &lqe->lqe_lockh);
599         lqe_write_unlock(lqe);
600
601         /* hold a refcount until completion */
602         lqe_getref(lqe);
603
604         /* fill other quota body fields */
605         qbody->qb_fid = qqi->qqi_fid;
606         qbody->qb_id  = lqe->lqe_id;
607
608         /* check whether we already own a valid lock for this ID */
609         rc = qsd_id_lock_match(&qti->qti_lockh, &qbody->qb_lockh);
610         if (rc) {
611                 struct lquota_lvb *lvb;
612
613                 OBD_ALLOC_PTR(lvb);
614                 if (lvb == NULL) {
615                         rc = -ENOMEM;
616                         qsd_req_completion(env, qqi, qbody, NULL,
617                                            &qti->qti_lockh, NULL, lqe, rc);
618                         RETURN(rc);
619                 }
620                 /* no lock found, should use intent */
621                 rc = qsd_intent_lock(env, qsd->qsd_exp, qbody, true,
622                                      IT_QUOTA_DQACQ, qsd_req_completion,
623                                      qqi, lvb, (void *)lqe);
624         } else {
625                 /* lock found, should use regular dqacq */
626                 rc = qsd_send_dqacq(env, qsd->qsd_exp, qbody, true,
627                                     qsd_req_completion, qqi, &qti->qti_lockh,
628                                     lqe);
629         }
630
631         /* the completion function will be called by qsd_send_dqacq or
632          * qsd_intent_lock */
633         RETURN(rc);
634 }
635
636 /**
637  * Acquire \a space of quota space in order to complete an operation.
638  * Try to consume local quota space first and send acquire request to quota
639  * master if required.
640  *
641  * \param env   - the environment passed by the caller
642  * \param lqe   - is the qid entry to be processed
643  * \param space - is the amount of quota required for the operation
644  * \param ret   - is the return code (-EDQUOT, -EINPROGRESS, ...)
645  *
646  * \retval true  - stop waiting in wait_event_idle_timeout,
647  *                 and real return value in \a ret
648  * \retval false - continue waiting
649  */
650 static bool qsd_acquire(const struct lu_env *env, struct lquota_entry *lqe,
651                         long long space, int *ret)
652 {
653         int rc = 0, count;
654         int wait_pending = 0;
655         struct qsd_qtype_info *qqi = lqe2qqi(lqe);
656
657         ENTRY;
658
659         for (count = 0; rc == 0; count++) {
660                 LQUOTA_DEBUG(lqe, "acquiring:%lld count=%d", space, count);
661 again:
662                 if (lqe2qqi(lqe)->qqi_qsd->qsd_stopping) {
663                         rc = -EINPROGRESS;
664                         break;
665                 }
666
667                 /* refresh disk usage */
668                 rc = qsd_refresh_usage(env, lqe);
669                 if (rc)
670                         break;
671
672                 /* try to consume local quota space first */
673                 rc = qsd_acquire_local(lqe, space);
674                 if (rc != -EAGAIN)
675                         /* rc == 0, Wouhou! enough local quota space
676                          * rc < 0, something bad happened */
677                          break;
678                 /*
679                  * There might be a window that commit transaction
680                  * have updated usage but pending write doesn't change
681                  * wait for it before acquiring remotely.
682                  */
683                 if (lqe->lqe_pending_write >= space && !wait_pending) {
684                         wait_pending = 1;
685                         dt_sync(env, qqi->qqi_qsd->qsd_dev);
686                         goto again;
687                 }
688
689                 /* if we have gotten some quota and stil wait more quota,
690                  * it's better to give QMT some time to reclaim from clients */
691                 if (count > 0)
692                         schedule_timeout_interruptible(cfs_time_seconds(1));
693
694                 /* need to acquire more quota space from master */
695                 rc = qsd_acquire_remote(env, lqe);
696         }
697
698         if (rc == -EBUSY)
699                 /* already a request in flight, continue waiting */
700                 RETURN(false);
701         *ret = rc;
702         RETURN(true);
703 }
704
705 /**
706  * Quota enforcement handler. If local quota can satisfy this operation,
707  * return success, otherwise, acquire more quota from master.
708  * (for write operation, if master isn't available at this moment, return
709  * -EINPROGRESS to inform client to retry the write)
710  *
711  * \param env   - the environment passed by the caller
712  * \param qsd   - is the qsd instance associated with the device in charge
713  *                of the operation.
714  * \param qid   - is the qid information attached in the transaction handle
715  * \param space - is the space required by the operation
716  * \param flags - if the operation is write, return caller no user/group
717  *                and sync commit flags
718  *
719  * \retval 0            - success
720  * \retval -EDQUOT      - out of quota
721  * \retval -EINPROGRESS - inform client to retry write
722  * \retval -ve          - other appropriate errors
723  */
724 static int qsd_op_begin0(const struct lu_env *env, struct qsd_qtype_info *qqi,
725                          struct lquota_id_info *qid, long long space,
726                          enum osd_quota_local_flags *local_flags)
727 {
728         struct lquota_entry *lqe;
729         enum osd_quota_local_flags qtype_flag = 0;
730         int rc, ret = -EINPROGRESS;
731         ENTRY;
732
733         if (qid->lqi_qentry != NULL) {
734                 /* we already had to deal with this id for this transaction */
735                 lqe = qid->lqi_qentry;
736                 if (!lqe->lqe_enforced)
737                         RETURN(0);
738         } else {
739                 /* look up lquota entry associated with qid */
740                 lqe = lqe_locate(env, qqi->qqi_site, &qid->lqi_id);
741                 if (IS_ERR(lqe))
742                         RETURN(PTR_ERR(lqe));
743                 if (!lqe->lqe_enforced) {
744                         lqe_putref(lqe);
745                         RETURN(0);
746                 }
747                 qid->lqi_qentry = lqe;
748                 /* lqe will be released in qsd_op_end() */
749         }
750
751         if (space <= 0) {
752                 /* when space is negative or null, we don't need to consume
753                  * quota space. That said, we still want to perform space
754                  * adjustments in qsd_op_end, so we return here, but with
755                  * a reference on the lqe */
756                 if (local_flags != NULL) {
757                         rc = qsd_refresh_usage(env, lqe);
758                         GOTO(out_flags, rc);
759                 }
760                 RETURN(0);
761         }
762
763         LQUOTA_DEBUG(lqe, "op_begin space:%lld", space);
764
765         lqe_write_lock(lqe);
766         lqe->lqe_waiting_write += space;
767         lqe_write_unlock(lqe);
768
769         /* acquire quota space for the operation, cap overall wait time to
770          * prevent a service thread from being stuck for too long */
771         rc = wait_event_idle_timeout(
772                 lqe->lqe_waiters, qsd_acquire(env, lqe, space, &ret),
773                 cfs_time_seconds(qsd_wait_timeout(qqi->qqi_qsd)));
774
775         if (rc > 0 && ret == 0) {
776                 qid->lqi_space += space;
777                 rc = 0;
778         } else {
779                 if (rc > 0)
780                         rc = ret;
781                 else if (rc == 0)
782                         rc = -ETIMEDOUT;
783
784                 LQUOTA_DEBUG(lqe, "acquire quota failed:%d", rc);
785
786                 lqe_write_lock(lqe);
787                 lqe->lqe_waiting_write -= space;
788
789                 if (local_flags && lqe->lqe_pending_write != 0)
790                         /* Inform OSD layer that there are pending writes.
791                          * It might want to retry after a sync if appropriate */
792                          *local_flags |= QUOTA_FL_SYNC;
793                 lqe_write_unlock(lqe);
794
795                 /* convert recoverable error into -EINPROGRESS, client will
796                  * retry */
797                 if (rc == -ETIMEDOUT || rc == -ENOTCONN || rc == -ENOLCK ||
798                     rc == -EAGAIN || rc == -EINTR) {
799                         rc = -EINPROGRESS;
800                 } else if (rc == -ESRCH) {
801                         rc = 0;
802                         LQUOTA_ERROR(lqe, "ID isn't enforced on master, it "
803                                      "probably due to a legeal race, if this "
804                                      "message is showing up constantly, there "
805                                      "could be some inconsistence between "
806                                      "master & slave, and quota reintegration "
807                                      "needs be re-triggered.");
808                 }
809         }
810
811         if (local_flags != NULL) {
812 out_flags:
813                 LASSERT(qid->lqi_is_blk);
814                 if (rc != 0) {
815                         *local_flags |= lquota_over_fl(qqi->qqi_qtype);
816                 } else {
817                         __u64   usage;
818
819                         lqe_read_lock(lqe);
820                         usage = lqe->lqe_pending_write;
821                         usage += lqe->lqe_waiting_write;
822                         /* There is a chance to successfully grant more quota
823                          * but get edquot flag through glimpse. */
824                         if (lqe->lqe_edquot || (lqe->lqe_qunit != 0 &&
825                            (usage % lqe->lqe_qunit >
826                             qqi->qqi_qsd->qsd_sync_threshold)))
827                                 usage += qqi->qqi_qsd->qsd_sync_threshold;
828
829                         usage += lqe->lqe_usage;
830
831                         qtype_flag = lquota_over_fl(qqi->qqi_qtype);
832                         /* if we should notify client to start sync write */
833                         if (usage >= lqe->lqe_granted - lqe->lqe_pending_rel)
834                                 *local_flags |= qtype_flag;
835                         else
836                                 *local_flags &= ~qtype_flag;
837                         lqe_read_unlock(lqe);
838                 }
839         }
840         RETURN(rc);
841 }
842
843 /**
844  * helper function comparing two lquota_id_info structures
845  */
846 static inline bool qid_equal(struct lquota_id_info *q1,
847                              struct lquota_id_info *q2)
848 {
849         if (q1->lqi_is_blk != q2->lqi_is_blk || q1->lqi_type != q2->lqi_type)
850                 return false;
851         return (q1->lqi_id.qid_uid == q2->lqi_id.qid_uid) ? true : false;
852 }
853
854 /**
855  * Enforce quota, it's called in the declaration of each operation.
856  * qsd_op_end() will then be called later once all the operations have been
857  * completed in order to release/adjust the quota space.
858  *
859  * \param env   - the environment passed by the caller
860  * \param qsd   - is the qsd instance associated with the device in charge of
861  *                the operation.
862  * \param trans - is the quota transaction information
863  * \param qi    - qid & space required by current operation
864  * \param flags - if the operation is write, return caller no user/group and
865  *                sync commit flags
866  *
867  * \retval 0            - success
868  * \retval -EDQUOT      - out of quota
869  * \retval -EINPROGRESS - inform client to retry write
870  * \retval -ve          - other appropriate errors
871  */
872 int qsd_op_begin(const struct lu_env *env, struct qsd_instance *qsd,
873                  struct lquota_trans *trans, struct lquota_id_info *qi,
874                  enum osd_quota_local_flags *local_flags)
875 {
876         int     i, rc;
877         bool    found = false;
878         ENTRY;
879
880         /* fast path, ignore quota enforcement request for root owned files */
881         if (qi->lqi_id.qid_uid == 0)
882                 return 0;
883
884         if (unlikely(qsd == NULL))
885                 RETURN(0);
886
887         if (qsd->qsd_dev->dd_rdonly)
888                 RETURN(0);
889
890         /* We don't enforce quota until the qsd_instance is started */
891         read_lock(&qsd->qsd_lock);
892         if (!qsd->qsd_started) {
893                 read_unlock(&qsd->qsd_lock);
894                 RETURN(0);
895         }
896         read_unlock(&qsd->qsd_lock);
897
898         /* ignore block quota on MDTs, ignore inode quota on OSTs */
899         if ((!qsd->qsd_is_md && !qi->lqi_is_blk) ||
900             (qsd->qsd_is_md && qi->lqi_is_blk))
901                 RETURN(0);
902
903         /* ignore quota enforcement request when:
904          *    - quota isn't enforced for this quota type
905          * or - the user/group is root
906          * or - quota accounting isn't enabled */
907         if (!qsd_type_enabled(qsd, qi->lqi_type) ||
908             (qsd->qsd_type_array[qi->lqi_type])->qqi_acct_failed)
909                 RETURN(0);
910
911         if (local_flags && qi->lqi_id.qid_projid && qsd->qsd_root_prj_enable)
912                 *local_flags |= QUOTA_FL_ROOT_PRJQUOTA;
913
914         LASSERTF(trans->lqt_id_cnt <= QUOTA_MAX_TRANSIDS, "id_cnt=%d\n",
915                  trans->lqt_id_cnt);
916         /* check whether we already allocated a slot for this id */
917         for (i = 0; i < trans->lqt_id_cnt; i++) {
918                 if (qid_equal(qi, &trans->lqt_ids[i])) {
919                         found = true;
920                         break;
921                 }
922         }
923
924         if (!found) {
925                 if (unlikely(i >= QUOTA_MAX_TRANSIDS)) {
926                         CERROR("%s: more than %d qids enforced for a "
927                                "transaction?\n", qsd->qsd_svname, i);
928                         RETURN(-EINVAL);
929                 }
930
931                 /* fill new slot */
932                 trans->lqt_ids[i].lqi_id     = qi->lqi_id;
933                 trans->lqt_ids[i].lqi_type   = qi->lqi_type;
934                 trans->lqt_ids[i].lqi_is_blk = qi->lqi_is_blk;
935                 trans->lqt_id_cnt++;
936         }
937
938         /* manage quota enforcement for this ID */
939         rc = qsd_op_begin0(env, qsd->qsd_type_array[qi->lqi_type],
940                            &trans->lqt_ids[i], qi->lqi_space, local_flags);
941         RETURN(rc);
942 }
943 EXPORT_SYMBOL(qsd_op_begin);
944
945 /**
946  * Adjust quota space (by acquiring or releasing) hold by the quota slave.
947  * This function is called after each quota request completion and during
948  * reintegration in order to report usage or re-acquire quota locks.
949  * Space adjustment is aborted if there is already a quota request in flight
950  * for this ID.
951  *
952  * \param env    - the environment passed by the caller
953  * \param lqe    - is the qid entry to be processed
954  *
955  * \retval 0 on success, appropriate errors on failure
956  */
957 int qsd_adjust(const struct lu_env *env, struct lquota_entry *lqe)
958 {
959         struct qsd_thread_info  *qti = qsd_info(env);
960         struct quota_body       *qbody = &qti->qti_body;
961         struct qsd_instance     *qsd;
962         struct qsd_qtype_info   *qqi;
963         int                      rc;
964         bool                     intent = false;
965         ENTRY;
966
967         memset(qbody, 0, sizeof(*qbody));
968         rc = qsd_ready(lqe, &qbody->qb_glb_lockh);
969         if (rc) {
970                 /* add to adjust list again to trigger adjustment later when
971                  * slave is ready */
972                 LQUOTA_DEBUG(lqe, "delaying adjustment since qsd isn't ready");
973                 qsd_adjust_schedule(lqe, true, false);
974                 RETURN(0);
975         }
976
977         qqi = lqe2qqi(lqe);
978         qsd = qqi->qqi_qsd;
979
980         if (qsd->qsd_dev->dd_rdonly)
981                 RETURN(0);
982
983         lqe_write_lock(lqe);
984
985         /* fill qb_count & qb_flags */
986         if (!qsd_calc_adjust(lqe, qbody)) {
987                 lqe_write_unlock(lqe);
988                 LQUOTA_DEBUG(lqe, "no adjustment required");
989                 RETURN(0);
990         }
991
992         /* only 1 quota request in flight for a given ID is allowed */
993         rc = qsd_request_enter(lqe);
994         if (rc) {
995                 /* already a request in flight, space adjustment will be run
996                  * again on request completion */
997                 lqe_write_unlock(lqe);
998                 RETURN(0);
999         }
1000
1001         if (req_is_rel(qbody->qb_flags))
1002                 lqe->lqe_pending_rel = qbody->qb_count;
1003         lustre_handle_copy(&qti->qti_lockh, &lqe->lqe_lockh);
1004         lqe_write_unlock(lqe);
1005
1006         /* hold a refcount until completion */
1007         lqe_getref(lqe);
1008
1009         /* fill other quota body fields */
1010         qbody->qb_fid = qqi->qqi_fid;
1011         qbody->qb_id  = lqe->lqe_id;
1012
1013         if (req_is_acq(qbody->qb_flags) || req_is_preacq(qbody->qb_flags)) {
1014                 /* check whether we own a valid lock for this ID */
1015                 rc = qsd_id_lock_match(&qti->qti_lockh, &qbody->qb_lockh);
1016                 if (rc) {
1017                         memset(&qti->qti_lockh, 0, sizeof(qti->qti_lockh));
1018                         if (req_is_preacq(qbody->qb_flags)) {
1019                                 if (req_has_rep(qbody->qb_flags))
1020                                         /* still want to report usage */
1021                                         qbody->qb_flags = QUOTA_DQACQ_FL_REPORT;
1022                                 else
1023                                         /* no pre-acquire if no per-ID lock */
1024                                         GOTO(out, rc = -ENOLCK);
1025                         } else {
1026                                 /* no lock found, should use intent */
1027                                 intent = true;
1028                         }
1029                 } else if (req_is_acq(qbody->qb_flags) &&
1030                            qbody->qb_count == 0) {
1031                         /* found cached lock, no need to acquire */
1032                         GOTO(out, rc = 0);
1033                 }
1034         } else {
1035                 /* release and report don't need a per-ID lock */
1036                 memset(&qti->qti_lockh, 0, sizeof(qti->qti_lockh));
1037         }
1038
1039         if (!intent) {
1040                 rc = qsd_send_dqacq(env, qsd->qsd_exp, qbody, false,
1041                                     qsd_req_completion, qqi, &qti->qti_lockh,
1042                                     lqe);
1043         } else {
1044                 struct lquota_lvb *lvb;
1045
1046                 OBD_ALLOC_PTR(lvb);
1047                 if (lvb == NULL)
1048                         GOTO(out, rc = -ENOMEM);
1049
1050                 rc = qsd_intent_lock(env, qsd->qsd_exp, qbody, false,
1051                                      IT_QUOTA_DQACQ, qsd_req_completion,
1052                                      qqi, lvb, (void *)lqe);
1053         }
1054         /* the completion function will be called by qsd_send_dqacq or
1055          * qsd_intent_lock */
1056         RETURN(rc);
1057 out:
1058         qsd_req_completion(env, qqi, qbody, NULL, &qti->qti_lockh, NULL, lqe,
1059                            rc);
1060         return rc;
1061 }
1062
1063 /**
1064  * Post quota operation, pre-acquire/release quota from master.
1065  *
1066  * \param  env  - the environment passed by the caller
1067  * \param  qsd  - is the qsd instance attached to the OSD device which
1068  *                is handling the operation.
1069  * \param  qqi  - is the qsd_qtype_info structure associated with the quota ID
1070  *                subject to the operation
1071  * \param  qid  - stores information related to his ID for the operation
1072  *                which has just completed
1073  *
1074  * \retval 0    - success
1075  * \retval -ve  - failure
1076  */
1077 static void qsd_op_end0(const struct lu_env *env, struct qsd_qtype_info *qqi,
1078                         struct lquota_id_info *qid)
1079 {
1080         struct lquota_entry     *lqe;
1081         bool                     adjust;
1082         ENTRY;
1083
1084         lqe = qid->lqi_qentry;
1085         if (lqe == NULL)
1086                 RETURN_EXIT;
1087         qid->lqi_qentry = NULL;
1088
1089         /* refresh cached usage if a suitable environment is passed */
1090         if (env != NULL)
1091                 qsd_refresh_usage(env, lqe);
1092
1093         lqe_write_lock(lqe);
1094         if (qid->lqi_space > 0) {
1095                 if (lqe->lqe_pending_write < qid->lqi_space) {
1096                         LQUOTA_ERROR(lqe,
1097                                      "More pending write quota to reduce (pending %llu, space %lld)\n",
1098                                      lqe->lqe_pending_write, qid->lqi_space);
1099                         lqe->lqe_pending_write = 0;
1100                 } else {
1101                         lqe->lqe_pending_write -= qid->lqi_space;
1102                 }
1103         }
1104         if (env != NULL)
1105                 adjust = qsd_adjust_needed(lqe);
1106         else
1107                 adjust = true;
1108         lqe_write_unlock(lqe);
1109
1110         if (adjust) {
1111                 /* pre-acquire/release quota space is needed */
1112                 if (env != NULL)
1113                         qsd_adjust(env, lqe);
1114                 else
1115                         /* no suitable environment, handle adjustment in
1116                          * separate thread context */
1117                         qsd_adjust_schedule(lqe, false, false);
1118         }
1119         lqe_putref(lqe);
1120         EXIT;
1121 }
1122
1123 /**
1124  * Post quota operation. It's called after each operation transaction stopped.
1125  *
1126  * \param  env   - the environment passed by the caller
1127  * \param  qsd   - is the qsd instance associated with device which is handling
1128  *                 the operation.
1129  * \param  qids  - all qids information attached in the transaction handle
1130  * \param  count - is the number of qid entries in the qids array.
1131  *
1132  * \retval 0     - success
1133  * \retval -ve   - failure
1134  */
1135 void qsd_op_end(const struct lu_env *env, struct qsd_instance *qsd,
1136                 struct lquota_trans *trans)
1137 {
1138         int i;
1139         ENTRY;
1140
1141         if (unlikely(qsd == NULL))
1142                 RETURN_EXIT;
1143
1144         if (qsd->qsd_dev->dd_rdonly)
1145                 RETURN_EXIT;
1146
1147         /* We don't enforce quota until the qsd_instance is started */
1148         read_lock(&qsd->qsd_lock);
1149         if (!qsd->qsd_started) {
1150                 read_unlock(&qsd->qsd_lock);
1151                 RETURN_EXIT;
1152         }
1153         read_unlock(&qsd->qsd_lock);
1154
1155         LASSERT(trans != NULL);
1156
1157         for (i = 0; i < trans->lqt_id_cnt; i++) {
1158                 struct qsd_qtype_info *qqi;
1159
1160                 if (trans->lqt_ids[i].lqi_qentry == NULL)
1161                         continue;
1162
1163                 qqi = qsd->qsd_type_array[trans->lqt_ids[i].lqi_type];
1164                 qsd_op_end0(env, qqi, &trans->lqt_ids[i]);
1165         }
1166
1167         /* reset id_count to 0 so that a second accidental call to qsd_op_end()
1168          * does not result in failure */
1169         trans->lqt_id_cnt = 0;
1170         EXIT;
1171 }
1172 EXPORT_SYMBOL(qsd_op_end);
1173
1174 /* Simple wrapper on top of qsd API which implement quota transfer for osd
1175  * setattr needs. As a reminder, only the root user can change ownership of
1176  * a file, that's why EDQUOT & EINPROGRESS errors are discarded
1177  */
1178 int qsd_transfer(const struct lu_env *env, struct qsd_instance *qsd,
1179                  struct lquota_trans *trans, unsigned int qtype,
1180                  u64 orig_id, u64 new_id, u64 bspace,
1181                  struct lquota_id_info *qi)
1182 {
1183         int rc;
1184
1185         if (unlikely(!qsd))
1186                 return 0;
1187
1188         LASSERT(qtype < LL_MAXQUOTAS);
1189         if (qtype == PRJQUOTA)
1190                 if (!projid_valid(make_kprojid(&init_user_ns, new_id)))
1191                         return -EINVAL;
1192
1193         qi->lqi_type = qtype;
1194
1195         /* inode accounting */
1196         qi->lqi_is_blk = false;
1197
1198         /* one more inode for the new owner ... */
1199         qi->lqi_id.qid_uid = new_id;
1200         qi->lqi_space = 1;
1201         rc = qsd_op_begin(env, qsd, trans, qi, NULL);
1202         if (rc == -EDQUOT || rc == -EINPROGRESS)
1203                 rc = 0;
1204         if (rc)
1205                 return rc;
1206
1207         /* and one less inode for the current id */
1208         qi->lqi_id.qid_uid = orig_id;
1209         qi->lqi_space = -1;
1210         /* can't get EDQUOT when reducing usage */
1211         rc = qsd_op_begin(env, qsd, trans, qi, NULL);
1212         if (rc == -EINPROGRESS)
1213                 rc = 0;
1214         if (rc)
1215                 return rc;
1216
1217         /* block accounting */
1218         qi->lqi_is_blk = true;
1219
1220         /* more blocks for the new owner ... */
1221         qi->lqi_id.qid_uid = new_id;
1222         qi->lqi_space = bspace;
1223         rc = qsd_op_begin(env, qsd, trans, qi, NULL);
1224         if (rc == -EDQUOT || rc == -EINPROGRESS)
1225                 rc = 0;
1226         if (rc)
1227                 return rc;
1228
1229         /* and finally less blocks for the current owner */
1230         qi->lqi_id.qid_uid = orig_id;
1231         qi->lqi_space = -bspace;
1232         rc = qsd_op_begin(env, qsd, trans, qi, NULL);
1233         /* can't get EDQUOT when reducing usage */
1234         if (rc == -EINPROGRESS)
1235                 rc = 0;
1236         return rc;
1237 }
1238 EXPORT_SYMBOL(qsd_transfer);
1239
1240 /**
1241  * Trigger pre-acquire/release if necessary.
1242  * It's only used by ldiskfs osd so far. When unlink a file in ldiskfs, the
1243  * quota accounting isn't updated when the transaction stopped. Instead, it'll
1244  * be updated on the final iput, so qsd_op_adjust() will be called then (in
1245  * osd_object_delete()) to trigger quota release if necessary.
1246  *
1247  * \param env - the environment passed by the caller
1248  * \param qsd - is the qsd instance associated with the device in charge
1249  *              of the operation.
1250  * \param qid - is the lquota ID of the user/group for which to trigger
1251  *              quota space adjustment
1252  * \param qtype - is the quota type (USRQUOTA or GRPQUOTA)
1253  */
1254 void qsd_op_adjust(const struct lu_env *env, struct qsd_instance *qsd,
1255                    union lquota_id *qid, int qtype)
1256 {
1257         struct lquota_entry    *lqe;
1258         struct qsd_qtype_info  *qqi;
1259         bool                    adjust;
1260         ENTRY;
1261
1262         if (unlikely(qsd == NULL))
1263                 RETURN_EXIT;
1264
1265         /* We don't enforce quota until the qsd_instance is started */
1266         read_lock(&qsd->qsd_lock);
1267         if (!qsd->qsd_started) {
1268                 read_unlock(&qsd->qsd_lock);
1269                 RETURN_EXIT;
1270         }
1271         read_unlock(&qsd->qsd_lock);
1272
1273         qqi = qsd->qsd_type_array[qtype];
1274         LASSERT(qqi);
1275
1276         if (!qsd_type_enabled(qsd, qtype) || qqi->qqi_acct_obj == NULL ||
1277             qid->qid_uid == 0 || qsd->qsd_dev->dd_rdonly)
1278                 RETURN_EXIT;
1279
1280         read_lock(&qsd->qsd_lock);
1281         if (!qsd->qsd_started) {
1282                 read_unlock(&qsd->qsd_lock);
1283                 RETURN_EXIT;
1284         }
1285         read_unlock(&qsd->qsd_lock);
1286
1287         lqe = lqe_locate(env, qqi->qqi_site, qid);
1288         if (IS_ERR(lqe)) {
1289                 CERROR("%s: fail to locate lqe for id:%llu, type:%d\n",
1290                        qsd->qsd_svname, qid->qid_uid, qtype);
1291                 RETURN_EXIT;
1292         }
1293
1294         qsd_refresh_usage(env, lqe);
1295
1296         lqe_read_lock(lqe);
1297         adjust = qsd_adjust_needed(lqe);
1298         lqe_read_unlock(lqe);
1299
1300         if (adjust)
1301                 qsd_adjust(env, lqe);
1302
1303         lqe_putref(lqe);
1304         EXIT;
1305 }
1306 EXPORT_SYMBOL(qsd_op_adjust);
1307
1308 /**
1309  * Reserve or free quota.
1310  *
1311  * Currently, It's used to reserve quota space before changing the file's group
1312  * for normal user and free the reserved quota after the group change.
1313  *
1314  * \param env     - the environment passed by the caller
1315  * \param qsd     - is the qsd instance associated with the device in charge of
1316  *                  the operation.
1317  * \param qi      - qid & space required by current operation
1318  *
1319  * \retval 0            - success
1320  * \retval -EDQUOT      - out of quota
1321  * \retval -EINPROGRESS - inform client to retry write
1322  * \retval -ve          - other appropriate errors
1323  */
1324 int qsd_reserve_or_free_quota(const struct lu_env *env,
1325                               struct qsd_instance *qsd,
1326                               struct lquota_id_info *qi)
1327 {
1328         struct qsd_qtype_info  *qqi;
1329         bool is_free = qi->lqi_space < 0;
1330         int rc = 0;
1331
1332         ENTRY;
1333
1334         if (unlikely(qsd == NULL))
1335                 RETURN(0);
1336
1337         if (qsd->qsd_dev->dd_rdonly)
1338                 RETURN(0);
1339
1340         if (is_free)
1341                 qi->lqi_space *= -1;
1342
1343         /* We don't enforce quota until the qsd_instance is started */
1344         read_lock(&qsd->qsd_lock);
1345         if (!qsd->qsd_started) {
1346                 read_unlock(&qsd->qsd_lock);
1347                 RETURN(0);
1348         }
1349         read_unlock(&qsd->qsd_lock);
1350
1351         qqi = qsd->qsd_type_array[qi->lqi_type];
1352         LASSERT(qqi);
1353
1354         CDEBUG(D_QUOTA, "type %s, acct %s, free %d, count %llu\n",
1355                qsd_type_enabled(qsd, qi->lqi_type) ? "enabled" : "disabled",
1356                (qsd->qsd_type_array[qi->lqi_type])->qqi_acct_failed ? "failed" :
1357                "succeed", is_free, qi->lqi_space);
1358
1359         /* ignore quota enforcement request when:
1360          *    - quota isn't enforced for this quota type
1361          * or - the user/group is root
1362          * or - quota accounting isn't enabled
1363          */
1364         if (!is_free &&
1365             (!qsd_type_enabled(qsd, qi->lqi_type) || qi->lqi_id.qid_uid == 0 ||
1366               (qsd->qsd_type_array[qi->lqi_type])->qqi_acct_failed))
1367                 RETURN(0);
1368
1369         if (is_free) {
1370                 qsd_op_end0(env, qsd->qsd_type_array[qi->lqi_type], qi);
1371         } else {
1372                 long long qspace = qi->lqi_space;
1373
1374                 /* the acquired quota will add to lqi_space in qsd_op_begin0 */
1375                 qi->lqi_space = 0;
1376                 rc = qsd_op_begin0(env, qsd->qsd_type_array[qi->lqi_type], qi,
1377                                    qspace, NULL);
1378                 if (rc && qi->lqi_qentry) {
1379                         lqe_putref(qi->lqi_qentry);
1380                         qi->lqi_qentry = NULL;
1381                 }
1382         }
1383
1384         CDEBUG(D_QUOTA, "%s quota: type %i, uid %llu, gid %llu, space %llu\n",
1385                is_free ? "Free" : "Reserve", qi->lqi_type, qi->lqi_id.qid_uid,
1386                qi->lqi_id.qid_gid, qi->lqi_space);
1387
1388         RETURN(rc);
1389 }
1390 EXPORT_SYMBOL(qsd_reserve_or_free_quota);