Whamcloud - gitweb
LU-19098 hsm: don't print progname twice with lhsmtool
[fs/lustre-release.git] / lustre / quota / qsd_handler.c
1 // SPDX-License-Identifier: GPL-2.0
2
3 /*
4  * Copyright (c) 2012, 2017, Intel Corporation.
5  * Use is subject to license terms.
6  */
7
8 /*
9  * Author: Johann Lombardi <johann.lombardi@intel.com>
10  * Author: Niu    Yawei    <yawei.niu@intel.com>
11  */
12
13 #define DEBUG_SUBSYSTEM S_LQUOTA
14
15 #include "qsd_internal.h"
16
17 /**
18  * helper function bumping lqe_pending_req if there is no quota request in
19  * flight for the lquota entry \a lqe. Otherwise, EBUSY is returned.
20  */
21 static inline int qsd_request_enter(struct lquota_entry *lqe)
22 {
23         /* is there already a quota request in flight? */
24         if (lqe->lqe_pending_req != 0) {
25                 LQUOTA_DEBUG(lqe, "already a request in flight");
26                 return -EBUSY;
27         }
28
29         if (lqe->lqe_pending_rel != 0) {
30                 LQUOTA_ERROR(lqe, "no request in flight with pending_rel=%llu",
31                              lqe->lqe_pending_rel);
32                 LBUG();
33         }
34
35         lqe->lqe_pending_req++;
36         return 0;
37 }
38
39 /**
40  * Companion of qsd_request_enter() dropping lqe_pending_req to 0.
41  */
42 static inline void qsd_request_exit(struct lquota_entry *lqe)
43 {
44         if (lqe->lqe_pending_req != 1) {
45                 LQUOTA_ERROR(lqe, "lqe_pending_req != 1!!!");
46                 LBUG();
47         }
48         lqe->lqe_pending_req--;
49         lqe->lqe_pending_rel = 0;
50         wake_up(&lqe->lqe_waiters);
51 }
52
53 /**
54  * Check whether a qsd instance is all set to send quota request to master.
55  * This includes checking whether:
56  * - the connection to master is set up and usable,
57  * - the qsd isn't stopping
58  * - reintegration has been successfully completed and all indexes are
59  *   up-to-date
60  *
61  * \param lqe - is the lquota entry for which we would like to send an quota
62  *              request
63  * \param lockh - is the remote handle of the global lock returned on success
64  *
65  * \retval 0 on success, appropriate error on failure
66  */
67 static int qsd_ready(struct lquota_entry *lqe, struct lustre_handle *lockh)
68 {
69         struct qsd_qtype_info   *qqi = lqe2qqi(lqe);
70         struct qsd_instance     *qsd = qqi->qqi_qsd;
71         struct obd_import       *imp = NULL;
72         struct ldlm_lock        *lock;
73         ENTRY;
74
75         read_lock(&qsd->qsd_lock);
76         /* is the qsd about to shut down? */
77         if (qsd->qsd_stopping) {
78                 read_unlock(&qsd->qsd_lock);
79                 LQUOTA_DEBUG(lqe, "dropping quota req since qsd is stopping");
80                 /* Target is about to shut down, client will retry */
81                 RETURN(-EINPROGRESS);
82         }
83
84         /* is the connection to the quota master ready? */
85         if (qsd->qsd_exp_valid)
86                 imp = class_exp2cliimp(qsd->qsd_exp);
87         if (imp == NULL || imp->imp_invalid) {
88                 read_unlock(&qsd->qsd_lock);
89                 LQUOTA_DEBUG(lqe, "connection to master not ready");
90                 RETURN(-ENOTCONN);
91         }
92
93         /* In most case, reintegration must have been triggered (when enable
94          * quota or on OST start), however, in rare race condition (enabling
95          * quota when starting OSTs), we might miss triggering reintegration
96          * for some qqi.
97          *
98          * If the previous reintegration failed for some reason, we'll
99          * re-trigger it here as well. */
100         if (!qqi->qqi_glb_uptodate || !qqi->qqi_slv_uptodate) {
101                 read_unlock(&qsd->qsd_lock);
102                 LQUOTA_DEBUG(lqe, "not up-to-date, dropping request and "
103                              "kicking off reintegration");
104                 qsd_start_reint_thread(qqi);
105                 RETURN(-EINPROGRESS);
106         }
107
108         /* Fill the remote global lock handle, master will check this handle
109          * to see if the slave is sending request with stale lock */
110         lustre_handle_copy(lockh, &qqi->qqi_lockh);
111         read_unlock(&qsd->qsd_lock);
112
113         if (!lustre_handle_is_used(lockh))
114                 RETURN(-ENOLCK);
115
116         lock = ldlm_handle2lock(lockh);
117         if (lock == NULL)
118                 RETURN(-ENOLCK);
119
120         /* return remote lock handle to be packed in quota request */
121         lustre_handle_copy(lockh, &lock->l_remote_handle);
122         ldlm_lock_put(lock);
123
124         RETURN(0);
125 }
126
127 /**
128  * Check whether any quota space adjustment (pre-acquire/release/report) is
129  * needed for a given quota ID. If a non-null \a qbody is passed, then the
130  * \a qbody structure (qb_count/flags/usage) is filled with appropriate data
131  * to be packed in the quota request.
132  *
133  * \param lqe   - is the lquota entry for which we would like to adjust quota
134  *                space.
135  * \param qbody - is the quota body to fill, if not NULL.
136  *
137  * \retval true  - space adjustment is required and \a qbody is filled, if not
138  *                 NULL
139  * \retval false - no space adjustment required
140  */
141 static bool qsd_calc_adjust(struct lquota_entry *lqe, struct quota_body *qbody)
142 {
143         __u64   usage, granted;
144         ENTRY;
145
146         usage   = lqe->lqe_usage;
147         usage  += lqe->lqe_pending_write + lqe->lqe_waiting_write;
148         granted = lqe->lqe_granted;
149
150         if (qbody != NULL)
151                 qbody->qb_flags = 0;
152
153         if (!lqe->lqe_enforced) {
154                 /* quota not enforced any more for this ID */
155                 if (granted != 0) {
156                         /* release all quota space unconditionally */
157                         LQUOTA_DEBUG(lqe, "not enforced, releasing all space");
158                         if (qbody != NULL) {
159                                 qbody->qb_count = granted;
160                                 qbody->qb_flags = QUOTA_DQACQ_FL_REL;
161                         }
162                         RETURN(true);
163                 }
164                 RETURN(false);
165         }
166
167         if (!lustre_handle_is_used(&lqe->lqe_lockh)) {
168                 /* No valid per-ID lock
169                  * When reporting quota (during reintegration or on setquota
170                  * glimpse), we should release granted space if usage is 0.
171                  * Otherwise, if the usage is less than granted, we need to
172                  * acquire the per-ID lock to make sure the unused grant can be
173                  * reclaimed by per-ID lock glimpse. */
174                 if (usage == 0) {
175                         /* no on-disk usage and no outstanding activity, release
176                          * space */
177                         if (granted != 0) {
178                                 LQUOTA_DEBUG(lqe, "no usage, releasing all "
179                                              "space");
180                                 if (qbody != NULL) {
181                                         qbody->qb_count = granted;
182                                         qbody->qb_flags = QUOTA_DQACQ_FL_REL;
183                                 }
184                                 RETURN(true);
185                         }
186                         LQUOTA_DEBUG(lqe, "no usage + no granted, nothing to "
187                                      "do");
188                         RETURN(false);
189                 }
190
191                 if (lqe->lqe_usage < lqe->lqe_granted) {
192                         /* holding quota space w/o any lock, enqueue per-ID lock
193                          * again */
194                         LQUOTA_DEBUG(lqe, "(re)acquiring per-ID lock");
195                         if (qbody != NULL) {
196                                 qbody->qb_count = 0;
197                                 qbody->qb_flags = QUOTA_DQACQ_FL_ACQ;
198                         }
199                         RETURN(true);
200                 }
201
202                 if (lqe->lqe_usage > lqe->lqe_granted) {
203                         /* quota overrun, report usage */
204                         LQUOTA_DEBUG(lqe, "overrun, reporting usage");
205                         if (qbody != NULL) {
206                                 qbody->qb_usage = lqe->lqe_usage;
207                                 qbody->qb_flags = QUOTA_DQACQ_FL_REPORT;
208                         }
209                         RETURN(true);
210                 }
211                 LQUOTA_DEBUG(lqe, "granted matches usage, nothing to do");
212                 RETURN(false);
213         }
214
215         /* valid per-ID lock
216          * Apply good old quota qunit adjustment logic which has been around
217          * since lustre 1.4:
218          * 1. revoke all extra grant
219          */
220         if (lqe->lqe_revoke) {
221                 if (qbody == NULL)
222                         RETURN(true);
223
224                 lqe->lqe_revoke = 0;
225
226                 LQUOTA_DEBUG(lqe, "revoke pre-acquired quota: %llu - %llu\n",
227                              granted, usage);
228                 qbody->qb_count = granted - usage;
229                 qbody->qb_flags = QUOTA_DQACQ_FL_REL;
230                 RETURN(true);
231         }
232
233         /* 2. release spare quota space? */
234         if (granted > usage + lqe->lqe_qunit) {
235                 /* pre-release quota space */
236                 if (qbody == NULL)
237                         RETURN(true);
238                 qbody->qb_count = granted - usage;
239                 /* if usage == 0, release all granted space */
240                 if (usage) {
241                         /* try to keep one qunit of quota space */
242                         qbody->qb_count -= lqe->lqe_qunit;
243                         /* but don't release less than qtune to avoid releasing
244                          * space too often */
245                         if (qbody->qb_count < lqe->lqe_qtune)
246                                 qbody->qb_count = lqe->lqe_qtune;
247                 }
248                 qbody->qb_flags = QUOTA_DQACQ_FL_REL;
249                 RETURN(true);
250         }
251
252         /* 3. Any quota overrun? */
253         if (lqe->lqe_usage > lqe->lqe_granted) {
254                 /* we overconsumed quota space, we report usage in request so
255                  * that master can adjust it unconditionally */
256                 if (qbody == NULL)
257                         RETURN(true);
258                 qbody->qb_usage = lqe->lqe_usage;
259                 granted         = lqe->lqe_usage;
260                 qbody->qb_flags = QUOTA_DQACQ_FL_REPORT;
261         }
262
263         /* 4. Time to pre-acquire? */
264         if (!lqe->lqe_edquot && !lqe->lqe_nopreacq && usage > 0 &&
265             lqe->lqe_qunit != 0 && granted < usage + lqe->lqe_qtune) {
266                 /* To pre-acquire quota space, we report how much spare quota
267                  * space the slave currently owns, then the master will grant us
268                  * back how much we can pretend given the current state of
269                  * affairs */
270                 if (qbody == NULL)
271                         RETURN(true);
272                 if (granted <= usage)
273                         qbody->qb_count = 0;
274                 else
275                         qbody->qb_count = granted - usage;
276                 qbody->qb_flags |= QUOTA_DQACQ_FL_PREACQ;
277                 RETURN(true);
278         }
279
280         if (qbody != NULL)
281                 RETURN(qbody->qb_flags != 0);
282         else
283                 RETURN(false);
284 }
285
286 /**
287  * Helper function returning true when quota space need to be adjusted (some
288  * unused space should be free or pre-acquire) and false otherwise.
289  */
290 static inline bool qsd_adjust_needed(struct lquota_entry *lqe)
291 {
292         return qsd_calc_adjust(lqe, NULL);
293 }
294
295 /**
296  * Callback function called when an acquire/release request sent to the master
297  * is completed
298  */
299 static void qsd_req_completion(const struct lu_env *env,
300                                struct qsd_qtype_info *qqi,
301                                struct quota_body *reqbody,
302                                struct quota_body *repbody,
303                                struct lustre_handle *lockh,
304                                struct lquota_lvb *lvb,
305                                void *arg, int ret)
306 {
307         struct lquota_entry     *lqe = (struct lquota_entry *)arg;
308         struct qsd_thread_info  *qti;
309         int                      rc;
310         bool                     adjust = false, cancel = false;
311         ENTRY;
312
313         LASSERT(qqi != NULL && lqe != NULL);
314
315         /* environment passed by ptlrpcd is mostly used by CLIO and hasn't the
316          * DT tags set. */
317         rc = lu_env_refill_by_tags((struct lu_env *)env, LCT_DT_THREAD, 0);
318         if (rc) {
319                 LQUOTA_ERROR(lqe, "failed to refill environmnent %d", rc);
320                 lqe_write_lock(lqe);
321                 /* can't afford to adjust quota space with no suitable lu_env */
322                 GOTO(out_noadjust, rc);
323         }
324         qti = qsd_info(env);
325
326         lqe_write_lock(lqe);
327         LQUOTA_DEBUG(lqe, "DQACQ returned %d, flags:0x%x", ret,
328                      reqbody->qb_flags);
329
330         /* despite -EDQUOT & -EINPROGRESS errors, the master might still
331          * grant us back quota space to adjust quota overrun */
332         if (ret != 0 && ret != -EDQUOT && ret != -EINPROGRESS) {
333                 if (ret != -ETIMEDOUT && ret != -ENOTCONN &&
334                    ret != -ESHUTDOWN && ret != -EAGAIN)
335                         /* print errors only if return code is unexpected */
336                         LQUOTA_ERROR(lqe, "DQACQ failed with %d, flags:0x%x",
337                                      ret, reqbody->qb_flags);
338                 GOTO(out, ret);
339         }
340
341         /* Set the lqe_lockh */
342         if (lustre_handle_is_used(lockh) &&
343             !lustre_handle_equal(lockh, &lqe->lqe_lockh))
344                 lustre_handle_copy(&lqe->lqe_lockh, lockh);
345
346         /* If the replied qb_count is zero, it means master didn't process
347          * the DQACQ since the limit for this ID has been removed, so we
348          * should not update quota entry & slave index copy neither. */
349         if (repbody != NULL && repbody->qb_count != 0) {
350                 LQUOTA_DEBUG(lqe, "DQACQ qb_count:%llu", repbody->qb_count);
351
352                 if (lqe->lqe_is_reset) {
353                         lqe->lqe_granted = 0;
354                 } else if (req_is_rel(reqbody->qb_flags)) {
355                         if (lqe->lqe_granted < repbody->qb_count) {
356                                 LQUOTA_ERROR(lqe, "can't release more space "
357                                              "than owned %llu<%llu",
358                                              lqe->lqe_granted,
359                                              repbody->qb_count);
360                                 lqe->lqe_granted = 0;
361                         } else {
362                                 lqe->lqe_granted -= repbody->qb_count;
363                         }
364                         /* Cancel the per-ID lock initiatively when there
365                          * isn't any usage & grant, which can avoid master
366                          * sending glimpse unnecessarily to this slave on
367                          * quota revoking */
368                         if (!lqe->lqe_pending_write && !lqe->lqe_granted &&
369                             !lqe->lqe_waiting_write && !lqe->lqe_usage)
370                                 cancel = true;
371                 } else {
372                         lqe->lqe_granted += repbody->qb_count;
373                 }
374                 qti->qti_rec.lqr_slv_rec.qsr_granted = lqe->lqe_granted;
375                 lqe_write_unlock(lqe);
376
377                 /* Update the slave index file in the dedicated thread. So far,
378                  * We don't update the version of slave index copy on DQACQ.
379                  * No locking is necessary since nobody can change
380                  * lqe->lqe_granted while lqe->lqe_pending_req > 0 */
381                 if (CFS_FAIL_CHECK(OBD_FAIL_QUOTA_GRANT))
382                         qti->qti_rec.lqr_slv_rec.qsr_granted =
383                                                         0xFFFFFFFFFFDEC80CULL;
384                 qsd_upd_schedule(qqi, lqe, &lqe->lqe_id, &qti->qti_rec, 0,
385                                  false);
386                 lqe_write_lock(lqe);
387         }
388
389         /* extract information from lvb */
390         if (ret == 0 && lvb != NULL) {
391                 if (lvb->lvb_id_qunit != 0)
392                         qsd_set_qunit(lqe, lvb->lvb_id_qunit);
393                 qsd_set_edquot(lqe, !!(lvb->lvb_flags & LQUOTA_FL_EDQUOT));
394         } else if (repbody != NULL && repbody->qb_qunit != 0) {
395                 qsd_set_qunit(lqe, repbody->qb_qunit);
396         }
397
398         /* turn off pre-acquire if it failed with -EDQUOT. This is done to avoid
399          * flooding the master with acquire request. Pre-acquire will be turned
400          * on again as soon as qunit is modified */
401         if (req_is_preacq(reqbody->qb_flags) && ret == -EDQUOT)
402                 lqe->lqe_nopreacq = true;
403 out:
404         adjust = qsd_adjust_needed(lqe);
405         if (reqbody && req_is_acq(reqbody->qb_flags) && ret != -EDQUOT) {
406                 lqe->lqe_acq_rc = ret;
407                 lqe->lqe_acq_time = ktime_get_seconds();
408         }
409 out_noadjust:
410         qsd_request_exit(lqe);
411         lqe_write_unlock(lqe);
412
413         /* release reference on per-ID lock */
414         if (lustre_handle_is_used(lockh))
415                 ldlm_lock_decref(lockh, qsd_id_einfo.ei_mode);
416
417         if (cancel) {
418                 qsd_adjust_schedule(lqe, false, true);
419         } else if (adjust) {
420                 if (!ret || ret == -EDQUOT)
421                         qsd_adjust_schedule(lqe, false, false);
422                 else
423                         qsd_adjust_schedule(lqe, true, false);
424         }
425         lqe_putref(lqe);
426
427         OBD_FREE_PTR(lvb);
428         EXIT;
429 }
430
431 /**
432  * Try to consume local quota space.
433  *
434  * \param lqe   - is the qid entry to be processed
435  * \param space - is the amount of quota space needed to complete the operation
436  *
437  * \retval 0       - success
438  * \retval -EDQUOT - out of quota
439  * \retval -EAGAIN - need to acquire space from master
440  */
441 static int qsd_acquire_local(struct lquota_entry *lqe, __u64 space)
442 {
443         __u64   usage;
444         int     rc;
445         ENTRY;
446
447         if (!lqe->lqe_enforced)
448                 /* not enforced any more, we are good */
449                 RETURN(-ESRCH);
450
451         lqe_write_lock(lqe);
452         /* use latest usage */
453         usage = lqe->lqe_usage;
454         /* take pending write into account */
455         usage += lqe->lqe_pending_write;
456
457         if (space + usage <= lqe->lqe_granted - lqe->lqe_pending_rel) {
458                 /* Yay! we got enough space */
459                 lqe->lqe_pending_write += space;
460                 lqe->lqe_waiting_write -= space;
461                 rc = 0;
462         /* lqe_edquot flag is used to avoid flooding dqacq requests when
463          * the user is over quota, however, the lqe_edquot could be stale
464          * sometimes due to the race reply of dqacq vs. id lock glimpse
465          * (see LU-4505), so we revalidate it every 5 seconds. */
466         } else if (lqe->lqe_edquot &&
467                    (lqe->lqe_edquot_time > ktime_get_seconds() - 5)) {
468                 rc = -EDQUOT;
469         }else {
470                 rc = -EAGAIN;
471         }
472         lqe_write_unlock(lqe);
473
474         RETURN(rc);
475 }
476
477 /**
478  * Compute how much quota space should be acquire from the master based
479  * on how much is currently granted to this slave and pending/waiting
480  * operations.
481  *
482  * \param lqe - is the lquota entry for which we would like to adjust quota
483  *              space.
484  * \param qbody - is the quota body of the acquire request to fill
485  *
486  * \retval true  - space acquisition is needed and qbody is filled
487  * \retval false - no space acquisition required
488  */
489 static inline bool qsd_calc_acquire(struct lquota_entry *lqe,
490                                     struct quota_body *qbody)
491 {
492         __u64   usage, granted;
493
494         usage   = lqe->lqe_usage;
495         usage  += lqe->lqe_pending_write + lqe->lqe_waiting_write;
496         granted = lqe->lqe_granted;
497
498         qbody->qb_flags = 0;
499
500         /* if we overconsumed quota space, we report usage in request so that
501          * master can adjust it unconditionally */
502         if (lqe->lqe_usage > lqe->lqe_granted) {
503                 qbody->qb_usage = lqe->lqe_usage;
504                 qbody->qb_flags = QUOTA_DQACQ_FL_REPORT;
505                 granted = lqe->lqe_usage;
506         }
507
508         /* acquire as much as needed, but not more */
509         if (usage > granted) {
510                 qbody->qb_count  = usage - granted;
511                 qbody->qb_flags |= QUOTA_DQACQ_FL_ACQ;
512         }
513
514         return qbody->qb_flags != 0;
515 }
516
517 /**
518  * Acquire quota space from master.
519  * There are at most 1 in-flight dqacq/dqrel.
520  *
521  * \param env    - the environment passed by the caller
522  * \param lqe    - is the qid entry to be processed
523  *
524  * \retval 0            - success
525  * \retval -EDQUOT      - out of quota
526  * \retval -EINPROGRESS - inform client to retry write/create
527  * \retval -EBUSY       - already a quota request in flight
528  * \retval -ve          - other appropriate errors
529  */
530 static int qsd_acquire_remote(const struct lu_env *env,
531                               struct lquota_entry *lqe)
532 {
533         struct qsd_thread_info  *qti = qsd_info(env);
534         struct quota_body       *qbody = &qti->qti_body;
535         struct qsd_instance     *qsd;
536         struct qsd_qtype_info   *qqi;
537         int                      rc;
538         ENTRY;
539
540         memset(qbody, 0, sizeof(*qbody));
541         rc = qsd_ready(lqe, &qbody->qb_glb_lockh);
542         if (rc)
543                 RETURN(rc);
544
545         qqi = lqe2qqi(lqe);
546         qsd = qqi->qqi_qsd;
547
548         lqe_write_lock(lqe);
549
550         /* is quota really enforced for this id? */
551         if (!lqe->lqe_enforced) {
552                 lqe_write_unlock(lqe);
553                 LQUOTA_DEBUG(lqe, "quota not enforced any more");
554                 RETURN(0);
555         }
556
557         /* fill qb_count & qb_flags */
558         if (!qsd_calc_acquire(lqe, qbody)) {
559                 lqe_write_unlock(lqe);
560                 LQUOTA_DEBUG(lqe, "No acquire required");
561                 RETURN(0);
562         }
563
564         /* check whether an acquire request completed recently */
565         if (lqe->lqe_acq_rc != 0 &&
566             lqe->lqe_acq_time > ktime_get_seconds() - 1) {
567                 lqe_write_unlock(lqe);
568                 LQUOTA_DEBUG(lqe, "using cached return code %d", lqe->lqe_acq_rc);
569                 RETURN(lqe->lqe_acq_rc);
570         }
571
572         /* only 1 quota request in flight for a given ID is allowed */
573         rc = qsd_request_enter(lqe);
574         if (rc) {
575                 lqe_write_unlock(lqe);
576                 RETURN(rc);
577         }
578
579         lustre_handle_copy(&qti->qti_lockh, &lqe->lqe_lockh);
580         lqe_write_unlock(lqe);
581
582         /* hold a refcount until completion */
583         lqe_getref(lqe);
584
585         /* fill other quota body fields */
586         qbody->qb_fid = qqi->qqi_fid;
587         qbody->qb_id  = lqe->lqe_id;
588
589         /* check whether we already own a valid lock for this ID */
590         rc = qsd_id_lock_match(&qti->qti_lockh, &qbody->qb_lockh);
591         if (rc) {
592                 struct lquota_lvb *lvb;
593
594                 OBD_ALLOC_PTR(lvb);
595                 if (lvb == NULL) {
596                         rc = -ENOMEM;
597                         qsd_req_completion(env, qqi, qbody, NULL,
598                                            &qti->qti_lockh, NULL, lqe, rc);
599                         RETURN(rc);
600                 }
601                 /* no lock found, should use intent */
602                 rc = qsd_intent_lock(env, qsd->qsd_exp, qbody, true,
603                                      IT_QUOTA_DQACQ, qsd_req_completion,
604                                      qqi, lvb, (void *)lqe);
605         } else {
606                 /* lock found, should use regular dqacq */
607                 rc = qsd_send_dqacq(env, qsd->qsd_exp, qbody, true,
608                                     qsd_req_completion, qqi, &qti->qti_lockh,
609                                     lqe);
610         }
611
612         /* the completion function will be called by qsd_send_dqacq or
613          * qsd_intent_lock */
614         RETURN(rc);
615 }
616
617 /**
618  * Acquire \a space of quota space in order to complete an operation.
619  * Try to consume local quota space first and send acquire request to quota
620  * master if required.
621  *
622  * \param env   - the environment passed by the caller
623  * \param lqe   - is the qid entry to be processed
624  * \param space - is the amount of quota required for the operation
625  * \param ret   - is the return code (-EDQUOT, -EINPROGRESS, ...)
626  *
627  * \retval true  - stop waiting in wait_event_idle_timeout,
628  *                 and real return value in \a ret
629  * \retval false - continue waiting
630  */
631 static bool qsd_acquire(const struct lu_env *env, struct lquota_entry *lqe,
632                         long long space, int *ret)
633 {
634         int rc = 0, count;
635         int wait_pending = 0;
636         struct qsd_qtype_info *qqi = lqe2qqi(lqe);
637
638         ENTRY;
639
640         for (count = 0; rc == 0; count++) {
641                 LQUOTA_DEBUG(lqe, "acquiring:%lld count=%d", space, count);
642 again:
643                 if (lqe2qqi(lqe)->qqi_qsd->qsd_stopping) {
644                         rc = -EINPROGRESS;
645                         break;
646                 }
647
648                 /* refresh disk usage */
649                 rc = qsd_refresh_usage(env, lqe);
650                 if (rc)
651                         break;
652
653                 /* try to consume local quota space first */
654                 rc = qsd_acquire_local(lqe, space);
655                 if (rc != -EAGAIN)
656                         /* rc == 0, Wouhou! enough local quota space
657                          * rc < 0, something bad happened */
658                          break;
659                 /*
660                  * There might be a window that commit transaction
661                  * have updated usage but pending write doesn't change
662                  * wait for it before acquiring remotely.
663                  */
664                 if (lqe->lqe_pending_write >= space && !wait_pending) {
665                         wait_pending = 1;
666                         dt_sync(env, qqi->qqi_qsd->qsd_dev);
667                         goto again;
668                 }
669
670                 /* if we have gotten some quota and stil wait more quota,
671                  * it's better to give QMT some time to reclaim from clients */
672                 if (count > 0)
673                         schedule_timeout_interruptible(cfs_time_seconds(1));
674
675                 /* need to acquire more quota space from master */
676                 rc = qsd_acquire_remote(env, lqe);
677         }
678
679         if (rc == -EBUSY)
680                 /* already a request in flight, continue waiting */
681                 RETURN(false);
682         *ret = rc;
683         RETURN(true);
684 }
685
686 /**
687  * Quota enforcement handler. If local quota can satisfy this operation,
688  * return success, otherwise, acquire more quota from master.
689  * (for write operation, if master isn't available at this moment, return
690  * -EINPROGRESS to inform client to retry the write)
691  *
692  * \param env   - the environment passed by the caller
693  * \param qsd   - is the qsd instance associated with the device in charge
694  *                of the operation.
695  * \param qid   - is the qid information attached in the transaction handle
696  * \param space - is the space required by the operation
697  * \param flags - if the operation is write, return caller no user/group
698  *                and sync commit flags
699  *
700  * \retval 0            - success
701  * \retval -EDQUOT      - out of quota
702  * \retval -EINPROGRESS - inform client to retry write
703  * \retval -ve          - other appropriate errors
704  */
705 static int qsd_op_begin0(const struct lu_env *env, struct qsd_qtype_info *qqi,
706                          struct lquota_id_info *qid, long long space,
707                          enum osd_quota_local_flags *local_flags)
708 {
709         struct lquota_entry *lqe;
710         DEFINE_WAIT_FUNC(wait, woken_wake_function);
711         enum osd_quota_local_flags qtype_flag = 0;
712         int rc = 0, ret = -EINPROGRESS;
713         long remaining;
714
715         ENTRY;
716         if (qid->lqi_qentry != NULL) {
717                 /* we already had to deal with this id for this transaction */
718                 lqe = qid->lqi_qentry;
719                 if (!lqe->lqe_enforced)
720                         RETURN(0);
721         } else {
722                 /* look up lquota entry associated with qid */
723                 lqe = lqe_locate(env, qqi->qqi_site, &qid->lqi_id);
724                 if (IS_ERR(lqe))
725                         RETURN(PTR_ERR(lqe));
726                 if (!lqe->lqe_enforced) {
727                         lqe_putref(lqe);
728                         RETURN(0);
729                 }
730                 qid->lqi_qentry = lqe;
731                 /* lqe will be released in qsd_op_end() */
732         }
733
734         if (space <= 0) {
735                 /* when space is negative or null, we don't need to consume
736                  * quota space. That said, we still want to perform space
737                  * adjustments in qsd_op_end, so we return here, but with
738                  * a reference on the lqe */
739                 if (local_flags != NULL) {
740                         rc = qsd_refresh_usage(env, lqe);
741                         GOTO(out_flags, rc);
742                 }
743                 RETURN(0);
744         }
745
746         LQUOTA_DEBUG(lqe, "op_begin space:%lld", space);
747
748         lqe_write_lock(lqe);
749         lqe->lqe_waiting_write += space;
750         lqe_write_unlock(lqe);
751
752         /* acquire quota space for the operation, cap overall wait time to
753          * prevent a service thread from being stuck for too long
754          */
755         remaining = cfs_time_seconds(qsd_wait_timeout(qqi->qqi_qsd));
756         add_wait_queue(&lqe->lqe_waiters, &wait);
757         do {
758                 if (qsd_acquire(env, lqe, space, &ret))
759                         break;
760
761                 remaining = wait_woken(&wait, TASK_IDLE, remaining);
762         } while (remaining > 0);
763
764         if (remaining > 0 && ret == 0) {
765                 qid->lqi_space += space;
766                 rc = 0;
767         } else {
768                 if (remaining > 0)
769                         rc = ret;
770                 else if (remaining == 0)
771                         rc = -ETIMEDOUT;
772
773                 LQUOTA_DEBUG(lqe, "acquire quota failed:%d", rc);
774
775                 lqe_write_lock(lqe);
776                 lqe->lqe_waiting_write -= space;
777
778                 if (local_flags && lqe->lqe_pending_write != 0)
779                         /* Inform OSD layer that there are pending writes.
780                          * It might want to retry after a sync if appropriate
781                          */
782                          *local_flags |= QUOTA_FL_SYNC;
783                 lqe_write_unlock(lqe);
784
785                 /* convert recoverable error into -EINPROGRESS, client will
786                  * retry
787                  */
788                 if (rc == -ETIMEDOUT || rc == -ENOTCONN || rc == -ENOLCK ||
789                     rc == -EAGAIN || rc == -EINTR) {
790                         rc = -EINPROGRESS;
791                 } else if (rc == -ESRCH) {
792                         rc = 0;
793                         LQUOTA_ERROR(lqe,
794                                      "ID isn't enforced on master, it probably due to a legeal race, if this message is showing up constantly, there could be some inconsistence between master & slave, and quota reintegration needs be re-triggered.");
795                 }
796         }
797         remove_wait_queue(&lqe->lqe_waiters, &wait);
798
799         if (local_flags != NULL) {
800 out_flags:
801                 LASSERT(qid->lqi_is_blk);
802                 if (rc != 0) {
803                         *local_flags |= lquota_over_fl(qqi->qqi_qtype);
804                 } else {
805                         __u64   usage;
806
807                         lqe_read_lock(lqe);
808                         usage = lqe->lqe_pending_write;
809                         usage += lqe->lqe_waiting_write;
810                         /* There is a chance to successfully grant more quota
811                          * but get edquot flag through glimpse. */
812                         if (lqe->lqe_edquot || (lqe->lqe_qunit != 0 &&
813                            (usage % lqe->lqe_qunit >
814                             qqi->qqi_qsd->qsd_sync_threshold)))
815                                 usage += qqi->qqi_qsd->qsd_sync_threshold;
816
817                         usage += lqe->lqe_usage;
818
819                         qtype_flag = lquota_over_fl(qqi->qqi_qtype);
820                         /* if we should notify client to start sync write */
821                         if (usage >= lqe->lqe_granted - lqe->lqe_pending_rel)
822                                 *local_flags |= qtype_flag;
823                         else
824                                 *local_flags &= ~qtype_flag;
825                         lqe_read_unlock(lqe);
826                 }
827         }
828         RETURN(rc);
829 }
830
831 /**
832  * helper function comparing two lquota_id_info structures
833  */
834 static inline bool qid_equal(struct lquota_id_info *q1,
835                              struct lquota_id_info *q2)
836 {
837         if (q1->lqi_is_blk != q2->lqi_is_blk || q1->lqi_type != q2->lqi_type)
838                 return false;
839         return (q1->lqi_id.qid_uid == q2->lqi_id.qid_uid) ? true : false;
840 }
841
842 /**
843  * Enforce quota, it's called in the declaration of each operation.
844  * qsd_op_end() will then be called later once all the operations have been
845  * completed in order to release/adjust the quota space.
846  *
847  * \param env   - the environment passed by the caller
848  * \param qsd   - is the qsd instance associated with the device in charge of
849  *                the operation.
850  * \param trans - is the quota transaction information
851  * \param qi    - qid & space required by current operation
852  * \param flags - if the operation is write, return caller no user/group and
853  *                sync commit flags
854  *
855  * \retval 0            - success
856  * \retval -EDQUOT      - out of quota
857  * \retval -EINPROGRESS - inform client to retry write
858  * \retval -ve          - other appropriate errors
859  */
860 int qsd_op_begin(const struct lu_env *env, struct qsd_instance *qsd,
861                  struct lquota_trans *trans, struct lquota_id_info *qi,
862                  enum osd_quota_local_flags *local_flags)
863 {
864         int     i, rc;
865         bool    found = false;
866         ENTRY;
867
868         /* fast path, ignore quota enforcement request for root owned files */
869         if (qi->lqi_id.qid_uid == 0)
870                 return 0;
871
872         if (unlikely(qsd == NULL))
873                 RETURN(0);
874
875         if (qsd->qsd_dev->dd_rdonly)
876                 RETURN(0);
877
878         /* We don't enforce quota until the qsd_instance is started */
879         read_lock(&qsd->qsd_lock);
880         if (!qsd->qsd_started) {
881                 read_unlock(&qsd->qsd_lock);
882                 RETURN(0);
883         }
884         read_unlock(&qsd->qsd_lock);
885
886         /* ignore block quota on MDTs, ignore inode quota on OSTs */
887         if ((!qsd->qsd_is_md && !qi->lqi_is_blk) ||
888             (qsd->qsd_is_md && qi->lqi_is_blk))
889                 RETURN(0);
890
891         /* ignore quota enforcement request when:
892          *    - quota isn't enforced for this quota type
893          * or - the user/group is root
894          * or - quota accounting isn't enabled */
895         if (!qsd_type_enabled(qsd, qi->lqi_type) ||
896             (qsd->qsd_type_array[qi->lqi_type])->qqi_acct_failed)
897                 RETURN(0);
898
899         if (qi->lqi_type == PRJQUOTA && local_flags && qi->lqi_id.qid_projid &&
900             (qsd->qsd_root_prj_enable || !qi->lqi_ignore_root_proj_quota))
901                 *local_flags |= QUOTA_FL_ROOT_PRJQUOTA;
902
903         LASSERTF(trans->lqt_id_cnt <= QUOTA_MAX_TRANSIDS, "id_cnt=%d\n",
904                  trans->lqt_id_cnt);
905         /* check whether we already allocated a slot for this id */
906         for (i = 0; i < trans->lqt_id_cnt; i++) {
907                 if (qid_equal(qi, &trans->lqt_ids[i])) {
908                         found = true;
909                         break;
910                 }
911         }
912
913         if (!found) {
914                 if (unlikely(i >= QUOTA_MAX_TRANSIDS)) {
915                         CERROR("%s: more than %d qids enforced for a "
916                                "transaction?\n", qsd->qsd_svname, i);
917                         RETURN(-EINVAL);
918                 }
919
920                 /* fill new slot */
921                 trans->lqt_ids[i].lqi_id     = qi->lqi_id;
922                 trans->lqt_ids[i].lqi_type   = qi->lqi_type;
923                 trans->lqt_ids[i].lqi_is_blk = qi->lqi_is_blk;
924                 trans->lqt_id_cnt++;
925         }
926
927         /* manage quota enforcement for this ID */
928         rc = qsd_op_begin0(env, qsd->qsd_type_array[qi->lqi_type],
929                            &trans->lqt_ids[i], qi->lqi_space, local_flags);
930         RETURN(rc);
931 }
932 EXPORT_SYMBOL(qsd_op_begin);
933
934 /**
935  * Adjust quota space (by acquiring or releasing) hold by the quota slave.
936  * This function is called after each quota request completion and during
937  * reintegration in order to report usage or re-acquire quota locks.
938  * Space adjustment is aborted if there is already a quota request in flight
939  * for this ID.
940  *
941  * \param env    - the environment passed by the caller
942  * \param lqe    - is the qid entry to be processed
943  *
944  * \retval 0 on success, appropriate errors on failure
945  */
946 int qsd_adjust(const struct lu_env *env, struct lquota_entry *lqe)
947 {
948         struct qsd_thread_info  *qti = qsd_info(env);
949         struct quota_body       *qbody = &qti->qti_body;
950         struct qsd_instance     *qsd;
951         struct qsd_qtype_info   *qqi;
952         int                      rc;
953         bool                     intent = false;
954         ENTRY;
955
956         memset(qbody, 0, sizeof(*qbody));
957         rc = qsd_ready(lqe, &qbody->qb_glb_lockh);
958         if (rc) {
959                 /* add to adjust list again to trigger adjustment later when
960                  * slave is ready */
961                 LQUOTA_DEBUG(lqe, "delaying adjustment since qsd isn't ready");
962                 qsd_adjust_schedule(lqe, true, false);
963                 RETURN(0);
964         }
965
966         qqi = lqe2qqi(lqe);
967         qsd = qqi->qqi_qsd;
968
969         if (qsd->qsd_dev->dd_rdonly)
970                 RETURN(0);
971
972         lqe_write_lock(lqe);
973
974         /* fill qb_count & qb_flags */
975         if (!qsd_calc_adjust(lqe, qbody)) {
976                 lqe_write_unlock(lqe);
977                 LQUOTA_DEBUG(lqe, "no adjustment required");
978                 RETURN(0);
979         }
980
981         /* only 1 quota request in flight for a given ID is allowed */
982         rc = qsd_request_enter(lqe);
983         if (rc) {
984                 /* already a request in flight, space adjustment will be run
985                  * again on request completion */
986                 lqe_write_unlock(lqe);
987                 RETURN(0);
988         }
989
990         if (req_is_rel(qbody->qb_flags))
991                 lqe->lqe_pending_rel = qbody->qb_count;
992         lustre_handle_copy(&qti->qti_lockh, &lqe->lqe_lockh);
993         lqe_write_unlock(lqe);
994
995         /* hold a refcount until completion */
996         lqe_getref(lqe);
997
998         /* fill other quota body fields */
999         qbody->qb_fid = qqi->qqi_fid;
1000         qbody->qb_id  = lqe->lqe_id;
1001
1002         if (req_is_acq(qbody->qb_flags) || req_is_preacq(qbody->qb_flags)) {
1003                 /* check whether we own a valid lock for this ID */
1004                 rc = qsd_id_lock_match(&qti->qti_lockh, &qbody->qb_lockh);
1005                 if (rc) {
1006                         memset(&qti->qti_lockh, 0, sizeof(qti->qti_lockh));
1007                         if (req_is_preacq(qbody->qb_flags)) {
1008                                 if (req_has_rep(qbody->qb_flags))
1009                                         /* still want to report usage */
1010                                         qbody->qb_flags = QUOTA_DQACQ_FL_REPORT;
1011                                 else
1012                                         /* no pre-acquire if no per-ID lock */
1013                                         GOTO(out, rc = -ENOLCK);
1014                         } else {
1015                                 /* no lock found, should use intent */
1016                                 intent = true;
1017                         }
1018                 } else if (req_is_acq(qbody->qb_flags) &&
1019                            qbody->qb_count == 0) {
1020                         /* found cached lock, no need to acquire */
1021                         GOTO(out, rc = 0);
1022                 }
1023         } else {
1024                 /* release and report don't need a per-ID lock */
1025                 memset(&qti->qti_lockh, 0, sizeof(qti->qti_lockh));
1026         }
1027
1028         if (!intent) {
1029                 rc = qsd_send_dqacq(env, qsd->qsd_exp, qbody, false,
1030                                     qsd_req_completion, qqi, &qti->qti_lockh,
1031                                     lqe);
1032         } else {
1033                 struct lquota_lvb *lvb;
1034
1035                 OBD_ALLOC_PTR(lvb);
1036                 if (lvb == NULL)
1037                         GOTO(out, rc = -ENOMEM);
1038
1039                 rc = qsd_intent_lock(env, qsd->qsd_exp, qbody, false,
1040                                      IT_QUOTA_DQACQ, qsd_req_completion,
1041                                      qqi, lvb, (void *)lqe);
1042         }
1043         /* the completion function will be called by qsd_send_dqacq or
1044          * qsd_intent_lock */
1045         RETURN(rc);
1046 out:
1047         qsd_req_completion(env, qqi, qbody, NULL, &qti->qti_lockh, NULL, lqe,
1048                            rc);
1049         return rc;
1050 }
1051
1052 /**
1053  * Post quota operation, pre-acquire/release quota from master.
1054  *
1055  * \param  env  - the environment passed by the caller
1056  * \param  qsd  - is the qsd instance attached to the OSD device which
1057  *                is handling the operation.
1058  * \param  qqi  - is the qsd_qtype_info structure associated with the quota ID
1059  *                subject to the operation
1060  * \param  qid  - stores information related to his ID for the operation
1061  *                which has just completed
1062  *
1063  * \retval 0    - success
1064  * \retval -ve  - failure
1065  */
1066 static void qsd_op_end0(const struct lu_env *env, struct qsd_qtype_info *qqi,
1067                         struct lquota_id_info *qid)
1068 {
1069         struct lquota_entry     *lqe;
1070         bool                     adjust;
1071         ENTRY;
1072
1073         lqe = qid->lqi_qentry;
1074         if (lqe == NULL)
1075                 RETURN_EXIT;
1076         qid->lqi_qentry = NULL;
1077
1078         /* refresh cached usage if a suitable environment is passed */
1079         if (env != NULL)
1080                 qsd_refresh_usage(env, lqe);
1081
1082         lqe_write_lock(lqe);
1083         if (qid->lqi_space > 0) {
1084                 if (lqe->lqe_pending_write < qid->lqi_space) {
1085                         LQUOTA_ERROR(lqe,
1086                                      "More pending write quota to reduce (pending %llu, space %lld)\n",
1087                                      lqe->lqe_pending_write, qid->lqi_space);
1088                         lqe->lqe_pending_write = 0;
1089                 } else {
1090                         lqe->lqe_pending_write -= qid->lqi_space;
1091                 }
1092         }
1093         if (env != NULL)
1094                 adjust = qsd_adjust_needed(lqe);
1095         else
1096                 adjust = true;
1097         lqe_write_unlock(lqe);
1098
1099         if (adjust) {
1100                 /* pre-acquire/release quota space is needed */
1101                 if (env != NULL)
1102                         qsd_adjust(env, lqe);
1103                 else
1104                         /* no suitable environment, handle adjustment in
1105                          * separate thread context */
1106                         qsd_adjust_schedule(lqe, false, false);
1107         }
1108         lqe_putref(lqe);
1109         EXIT;
1110 }
1111
1112 /**
1113  * Post quota operation. It's called after each operation transaction stopped.
1114  *
1115  * \param  env   - the environment passed by the caller
1116  * \param  qsd   - is the qsd instance associated with device which is handling
1117  *                 the operation.
1118  * \param  qids  - all qids information attached in the transaction handle
1119  * \param  count - is the number of qid entries in the qids array.
1120  *
1121  * \retval 0     - success
1122  * \retval -ve   - failure
1123  */
1124 void qsd_op_end(const struct lu_env *env, struct qsd_instance *qsd,
1125                 struct lquota_trans *trans)
1126 {
1127         int i;
1128         ENTRY;
1129
1130         if (unlikely(qsd == NULL))
1131                 RETURN_EXIT;
1132
1133         if (qsd->qsd_dev->dd_rdonly)
1134                 RETURN_EXIT;
1135
1136         /* We don't enforce quota until the qsd_instance is started */
1137         read_lock(&qsd->qsd_lock);
1138         if (!qsd->qsd_started) {
1139                 read_unlock(&qsd->qsd_lock);
1140                 RETURN_EXIT;
1141         }
1142         read_unlock(&qsd->qsd_lock);
1143
1144         LASSERT(trans != NULL);
1145
1146         for (i = 0; i < trans->lqt_id_cnt; i++) {
1147                 struct qsd_qtype_info *qqi;
1148
1149                 if (trans->lqt_ids[i].lqi_qentry == NULL)
1150                         continue;
1151
1152                 qqi = qsd->qsd_type_array[trans->lqt_ids[i].lqi_type];
1153                 qsd_op_end0(env, qqi, &trans->lqt_ids[i]);
1154         }
1155
1156         /* reset id_count to 0 so that a second accidental call to qsd_op_end()
1157          * does not result in failure */
1158         trans->lqt_id_cnt = 0;
1159         EXIT;
1160 }
1161 EXPORT_SYMBOL(qsd_op_end);
1162
1163 /* Simple wrapper on top of qsd API which implement quota transfer for osd
1164  * setattr needs. As a reminder, only the root user can change ownership of
1165  * a file, that's why EDQUOT & EINPROGRESS errors are discarded
1166  */
1167 int qsd_transfer(const struct lu_env *env, struct qsd_instance *qsd,
1168                  struct lquota_trans *trans, unsigned int qtype,
1169                  u64 orig_id, u64 new_id, u64 bspace,
1170                  struct lquota_id_info *qi)
1171 {
1172         int rc;
1173
1174         if (unlikely(!qsd))
1175                 return 0;
1176
1177         LASSERT(qtype < LL_MAXQUOTAS);
1178         if (qtype == PRJQUOTA)
1179                 if (!projid_valid(make_kprojid(&init_user_ns, new_id)))
1180                         return -EINVAL;
1181
1182         qi->lqi_type = qtype;
1183
1184         /* inode accounting */
1185         qi->lqi_is_blk = false;
1186
1187         /* one more inode for the new owner ... */
1188         qi->lqi_id.qid_uid = new_id;
1189         qi->lqi_space = 1;
1190         rc = qsd_op_begin(env, qsd, trans, qi, NULL);
1191         if (rc == -EDQUOT || rc == -EINPROGRESS)
1192                 rc = 0;
1193         if (rc)
1194                 return rc;
1195
1196         /* and one less inode for the current id */
1197         qi->lqi_id.qid_uid = orig_id;
1198         qi->lqi_space = -1;
1199         /* can't get EDQUOT when reducing usage */
1200         rc = qsd_op_begin(env, qsd, trans, qi, NULL);
1201         if (rc == -EINPROGRESS)
1202                 rc = 0;
1203         if (rc)
1204                 return rc;
1205
1206         /* block accounting */
1207         qi->lqi_is_blk = true;
1208
1209         /* more blocks for the new owner ... */
1210         qi->lqi_id.qid_uid = new_id;
1211         qi->lqi_space = bspace;
1212         rc = qsd_op_begin(env, qsd, trans, qi, NULL);
1213         if (rc == -EDQUOT || rc == -EINPROGRESS)
1214                 rc = 0;
1215         if (rc)
1216                 return rc;
1217
1218         /* and finally less blocks for the current owner */
1219         qi->lqi_id.qid_uid = orig_id;
1220         qi->lqi_space = -bspace;
1221         rc = qsd_op_begin(env, qsd, trans, qi, NULL);
1222         /* can't get EDQUOT when reducing usage */
1223         if (rc == -EINPROGRESS)
1224                 rc = 0;
1225         return rc;
1226 }
1227 EXPORT_SYMBOL(qsd_transfer);
1228
1229 /**
1230  * Trigger pre-acquire/release if necessary.
1231  * It's only used by ldiskfs osd so far. When unlink a file in ldiskfs, the
1232  * quota accounting isn't updated when the transaction stopped. Instead, it'll
1233  * be updated on the final iput, so qsd_op_adjust() will be called then (in
1234  * osd_object_delete()) to trigger quota release if necessary.
1235  *
1236  * \param env - the environment passed by the caller
1237  * \param qsd - is the qsd instance associated with the device in charge
1238  *              of the operation.
1239  * \param qid - is the lquota ID of the user/group for which to trigger
1240  *              quota space adjustment
1241  * \param qtype - is the quota type (USRQUOTA or GRPQUOTA)
1242  */
1243 void qsd_op_adjust(const struct lu_env *env, struct qsd_instance *qsd,
1244                    union lquota_id *qid, int qtype)
1245 {
1246         struct lquota_entry    *lqe;
1247         struct qsd_qtype_info  *qqi;
1248         bool                    adjust;
1249         ENTRY;
1250
1251         if (unlikely(qsd == NULL))
1252                 RETURN_EXIT;
1253
1254         /* We don't enforce quota until the qsd_instance is started */
1255         read_lock(&qsd->qsd_lock);
1256         if (!qsd->qsd_started) {
1257                 read_unlock(&qsd->qsd_lock);
1258                 RETURN_EXIT;
1259         }
1260         read_unlock(&qsd->qsd_lock);
1261
1262         qqi = qsd->qsd_type_array[qtype];
1263         LASSERT(qqi);
1264
1265         if (!qsd_type_enabled(qsd, qtype) || qqi->qqi_acct_obj == NULL ||
1266             qid->qid_uid == 0 || qsd->qsd_dev->dd_rdonly)
1267                 RETURN_EXIT;
1268
1269         read_lock(&qsd->qsd_lock);
1270         if (!qsd->qsd_started) {
1271                 read_unlock(&qsd->qsd_lock);
1272                 RETURN_EXIT;
1273         }
1274         read_unlock(&qsd->qsd_lock);
1275
1276         lqe = lqe_locate(env, qqi->qqi_site, qid);
1277         if (IS_ERR(lqe)) {
1278                 CERROR("%s: fail to locate lqe for id:%llu, type:%d\n",
1279                        qsd->qsd_svname, qid->qid_uid, qtype);
1280                 RETURN_EXIT;
1281         }
1282
1283         qsd_refresh_usage(env, lqe);
1284
1285         lqe_read_lock(lqe);
1286         adjust = qsd_adjust_needed(lqe);
1287         lqe_read_unlock(lqe);
1288
1289         if (adjust)
1290                 qsd_adjust(env, lqe);
1291
1292         lqe_putref(lqe);
1293         EXIT;
1294 }
1295 EXPORT_SYMBOL(qsd_op_adjust);
1296
1297 /**
1298  * Reserve or free quota.
1299  *
1300  * Currently, It's used to reserve quota space before changing the file's group
1301  * for normal user and free the reserved quota after the group change.
1302  *
1303  * \param env     - the environment passed by the caller
1304  * \param qsd     - is the qsd instance associated with the device in charge of
1305  *                  the operation.
1306  * \param qi      - qid & space required by current operation
1307  *
1308  * \retval 0            - success
1309  * \retval -EDQUOT      - out of quota
1310  * \retval -EINPROGRESS - inform client to retry write
1311  * \retval -ve          - other appropriate errors
1312  */
1313 int qsd_reserve_or_free_quota(const struct lu_env *env,
1314                               struct qsd_instance *qsd,
1315                               struct lquota_id_info *qi)
1316 {
1317         struct qsd_qtype_info  *qqi;
1318         bool is_free = qi->lqi_space < 0;
1319         int rc = 0;
1320
1321         ENTRY;
1322
1323         if (unlikely(qsd == NULL))
1324                 RETURN(0);
1325
1326         if (qsd->qsd_dev->dd_rdonly)
1327                 RETURN(0);
1328
1329         if (is_free)
1330                 qi->lqi_space *= -1;
1331
1332         /* We don't enforce quota until the qsd_instance is started */
1333         read_lock(&qsd->qsd_lock);
1334         if (!qsd->qsd_started) {
1335                 read_unlock(&qsd->qsd_lock);
1336                 RETURN(0);
1337         }
1338         read_unlock(&qsd->qsd_lock);
1339
1340         qqi = qsd->qsd_type_array[qi->lqi_type];
1341         LASSERT(qqi);
1342
1343         CDEBUG(D_QUOTA, "type %s, acct %s, free %d, count %llu\n",
1344                qsd_type_enabled(qsd, qi->lqi_type) ? "enabled" : "disabled",
1345                (qsd->qsd_type_array[qi->lqi_type])->qqi_acct_failed ? "failed" :
1346                "succeed", is_free, qi->lqi_space);
1347
1348         /* ignore quota enforcement request when:
1349          *    - quota isn't enforced for this quota type
1350          * or - the user/group is root
1351          * or - quota accounting isn't enabled
1352          */
1353         if (!is_free &&
1354             (!qsd_type_enabled(qsd, qi->lqi_type) || qi->lqi_id.qid_uid == 0 ||
1355               (qsd->qsd_type_array[qi->lqi_type])->qqi_acct_failed))
1356                 RETURN(0);
1357
1358         if (is_free) {
1359                 qsd_op_end0(env, qsd->qsd_type_array[qi->lqi_type], qi);
1360         } else {
1361                 long long qspace = qi->lqi_space;
1362
1363                 /* the acquired quota will add to lqi_space in qsd_op_begin0 */
1364                 qi->lqi_space = 0;
1365                 rc = qsd_op_begin0(env, qsd->qsd_type_array[qi->lqi_type], qi,
1366                                    qspace, NULL);
1367                 if (rc && qi->lqi_qentry) {
1368                         lqe_putref(qi->lqi_qentry);
1369                         qi->lqi_qentry = NULL;
1370                 }
1371         }
1372
1373         CDEBUG(D_QUOTA, "%s quota: type %i, uid %llu, gid %llu, space %llu\n",
1374                is_free ? "Free" : "Reserve", qi->lqi_type, qi->lqi_id.qid_uid,
1375                qi->lqi_id.qid_gid, qi->lqi_space);
1376
1377         RETURN(rc);
1378 }
1379 EXPORT_SYMBOL(qsd_reserve_or_free_quota);