Whamcloud - gitweb
LU-14927 quota: move qsd_transfer to lquota module
[fs/lustre-release.git] / lustre / quota / qsd_handler.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 021110-1307, USA
20  *
21  * GPL HEADER END
22  */
23 /*
24  * Copyright (c) 2012, 2017, Intel Corporation.
25  * Use is subject to license terms.
26  *
27  * Author: Johann Lombardi <johann.lombardi@intel.com>
28  * Author: Niu    Yawei    <yawei.niu@intel.com>
29  */
30
31 #define DEBUG_SUBSYSTEM S_LQUOTA
32
33 #include "qsd_internal.h"
34
35 /**
36  * helper function bumping lqe_pending_req if there is no quota request in
37  * flight for the lquota entry \a lqe. Otherwise, EBUSY is returned.
38  */
39 static inline int qsd_request_enter(struct lquota_entry *lqe)
40 {
41         /* is there already a quota request in flight? */
42         if (lqe->lqe_pending_req != 0) {
43                 LQUOTA_DEBUG(lqe, "already a request in flight");
44                 return -EBUSY;
45         }
46
47         if (lqe->lqe_pending_rel != 0) {
48                 LQUOTA_ERROR(lqe, "no request in flight with pending_rel=%llu",
49                              lqe->lqe_pending_rel);
50                 LBUG();
51         }
52
53         lqe->lqe_pending_req++;
54         return 0;
55 }
56
57 /**
58  * Companion of qsd_request_enter() dropping lqe_pending_req to 0.
59  */
60 static inline void qsd_request_exit(struct lquota_entry *lqe)
61 {
62         if (lqe->lqe_pending_req != 1) {
63                 LQUOTA_ERROR(lqe, "lqe_pending_req != 1!!!");
64                 LBUG();
65         }
66         lqe->lqe_pending_req--;
67         lqe->lqe_pending_rel = 0;
68         wake_up(&lqe->lqe_waiters);
69 }
70
71 /**
72  * Check whether a qsd instance is all set to send quota request to master.
73  * This includes checking whether:
74  * - the connection to master is set up and usable,
75  * - the qsd isn't stopping
76  * - reintegration has been successfully completed and all indexes are
77  *   up-to-date
78  *
79  * \param lqe - is the lquota entry for which we would like to send an quota
80  *              request
81  * \param lockh - is the remote handle of the global lock returned on success
82  *
83  * \retval 0 on success, appropriate error on failure
84  */
85 static int qsd_ready(struct lquota_entry *lqe, struct lustre_handle *lockh)
86 {
87         struct qsd_qtype_info   *qqi = lqe2qqi(lqe);
88         struct qsd_instance     *qsd = qqi->qqi_qsd;
89         struct obd_import       *imp = NULL;
90         struct ldlm_lock        *lock;
91         ENTRY;
92
93         read_lock(&qsd->qsd_lock);
94         /* is the qsd about to shut down? */
95         if (qsd->qsd_stopping) {
96                 read_unlock(&qsd->qsd_lock);
97                 LQUOTA_DEBUG(lqe, "dropping quota req since qsd is stopping");
98                 /* Target is about to shut down, client will retry */
99                 RETURN(-EINPROGRESS);
100         }
101
102         /* is the connection to the quota master ready? */
103         if (qsd->qsd_exp_valid)
104                 imp = class_exp2cliimp(qsd->qsd_exp);
105         if (imp == NULL || imp->imp_invalid) {
106                 read_unlock(&qsd->qsd_lock);
107                 LQUOTA_DEBUG(lqe, "connection to master not ready");
108                 RETURN(-ENOTCONN);
109         }
110
111         /* In most case, reintegration must have been triggered (when enable
112          * quota or on OST start), however, in rare race condition (enabling
113          * quota when starting OSTs), we might miss triggering reintegration
114          * for some qqi.
115          *
116          * If the previous reintegration failed for some reason, we'll
117          * re-trigger it here as well. */
118         if (!qqi->qqi_glb_uptodate || !qqi->qqi_slv_uptodate) {
119                 read_unlock(&qsd->qsd_lock);
120                 LQUOTA_DEBUG(lqe, "not up-to-date, dropping request and "
121                              "kicking off reintegration");
122                 qsd_start_reint_thread(qqi);
123                 RETURN(-EINPROGRESS);
124         }
125
126         /* Fill the remote global lock handle, master will check this handle
127          * to see if the slave is sending request with stale lock */
128         lustre_handle_copy(lockh, &qqi->qqi_lockh);
129         read_unlock(&qsd->qsd_lock);
130
131         if (!lustre_handle_is_used(lockh))
132                 RETURN(-ENOLCK);
133
134         lock = ldlm_handle2lock(lockh);
135         if (lock == NULL)
136                 RETURN(-ENOLCK);
137
138         /* return remote lock handle to be packed in quota request */
139         lustre_handle_copy(lockh, &lock->l_remote_handle);
140         LDLM_LOCK_PUT(lock);
141
142         RETURN(0);
143 }
144
145 /**
146  * Check whether any quota space adjustment (pre-acquire/release/report) is
147  * needed for a given quota ID. If a non-null \a qbody is passed, then the
148  * \a qbody structure (qb_count/flags/usage) is filled with appropriate data
149  * to be packed in the quota request.
150  *
151  * \param lqe   - is the lquota entry for which we would like to adjust quota
152  *                space.
153  * \param qbody - is the quota body to fill, if not NULL.
154  *
155  * \retval true  - space adjustment is required and \a qbody is filled, if not
156  *                 NULL
157  * \retval false - no space adjustment required
158  */
159 static bool qsd_calc_adjust(struct lquota_entry *lqe, struct quota_body *qbody)
160 {
161         __u64   usage, granted;
162         ENTRY;
163
164         usage   = lqe->lqe_usage;
165         usage  += lqe->lqe_pending_write + lqe->lqe_waiting_write;
166         granted = lqe->lqe_granted;
167
168         if (qbody != NULL)
169                 qbody->qb_flags = 0;
170
171         if (!lqe->lqe_enforced) {
172                 /* quota not enforced any more for this ID */
173                 if (granted != 0) {
174                         /* release all quota space unconditionally */
175                         LQUOTA_DEBUG(lqe, "not enforced, releasing all space");
176                         if (qbody != NULL) {
177                                 qbody->qb_count = granted;
178                                 qbody->qb_flags = QUOTA_DQACQ_FL_REL;
179                         }
180                         RETURN(true);
181                 }
182                 RETURN(false);
183         }
184
185         if (!lustre_handle_is_used(&lqe->lqe_lockh)) {
186                 /* No valid per-ID lock
187                  * When reporting quota (during reintegration or on setquota
188                  * glimpse), we should release granted space if usage is 0.
189                  * Otherwise, if the usage is less than granted, we need to
190                  * acquire the per-ID lock to make sure the unused grant can be
191                  * reclaimed by per-ID lock glimpse. */
192                 if (usage == 0) {
193                         /* no on-disk usage and no outstanding activity, release
194                          * space */
195                         if (granted != 0) {
196                                 LQUOTA_DEBUG(lqe, "no usage, releasing all "
197                                              "space");
198                                 if (qbody != NULL) {
199                                         qbody->qb_count = granted;
200                                         qbody->qb_flags = QUOTA_DQACQ_FL_REL;
201                                 }
202                                 RETURN(true);
203                         }
204                         LQUOTA_DEBUG(lqe, "no usage + no granted, nothing to "
205                                      "do");
206                         RETURN(false);
207                 }
208
209                 if (lqe->lqe_usage < lqe->lqe_granted) {
210                         /* holding quota space w/o any lock, enqueue per-ID lock
211                          * again */
212                         LQUOTA_DEBUG(lqe, "(re)acquiring per-ID lock");
213                         if (qbody != NULL) {
214                                 qbody->qb_count = 0;
215                                 qbody->qb_flags = QUOTA_DQACQ_FL_ACQ;
216                         }
217                         RETURN(true);
218                 }
219
220                 if (lqe->lqe_usage > lqe->lqe_granted) {
221                         /* quota overrun, report usage */
222                         LQUOTA_DEBUG(lqe, "overrun, reporting usage");
223                         if (qbody != NULL) {
224                                 qbody->qb_usage = lqe->lqe_usage;
225                                 qbody->qb_flags = QUOTA_DQACQ_FL_REPORT;
226                         }
227                         RETURN(true);
228                 }
229                 LQUOTA_DEBUG(lqe, "granted matches usage, nothing to do");
230                 RETURN(false);
231         }
232
233         /* valid per-ID lock
234          * Apply good old quota qunit adjustment logic which has been around
235          * since lustre 1.4:
236          * 1. release spare quota space? */
237         if (granted > usage + lqe->lqe_qunit) {
238                 /* pre-release quota space */
239                 if (qbody == NULL)
240                         RETURN(true);
241                 qbody->qb_count = granted - usage;
242                 /* if usage == 0, release all granted space */
243                 if (usage) {
244                         /* try to keep one qunit of quota space */
245                         qbody->qb_count -= lqe->lqe_qunit;
246                         /* but don't release less than qtune to avoid releasing
247                          * space too often */
248                         if (qbody->qb_count < lqe->lqe_qtune)
249                                 qbody->qb_count = lqe->lqe_qtune;
250                 }
251                 qbody->qb_flags = QUOTA_DQACQ_FL_REL;
252                 RETURN(true);
253         }
254
255         /* 2. Any quota overrun? */
256         if (lqe->lqe_usage > lqe->lqe_granted) {
257                 /* we overconsumed quota space, we report usage in request so
258                  * that master can adjust it unconditionally */
259                 if (qbody == NULL)
260                         RETURN(true);
261                 qbody->qb_usage = lqe->lqe_usage;
262                 granted         = lqe->lqe_usage;
263                 qbody->qb_flags = QUOTA_DQACQ_FL_REPORT;
264         }
265
266         /* 3. Time to pre-acquire? */
267         if (!lqe->lqe_edquot && !lqe->lqe_nopreacq && usage > 0 &&
268             lqe->lqe_qunit != 0 && granted < usage + lqe->lqe_qtune) {
269                 /* To pre-acquire quota space, we report how much spare quota
270                  * space the slave currently owns, then the master will grant us
271                  * back how much we can pretend given the current state of
272                  * affairs */
273                 if (qbody == NULL)
274                         RETURN(true);
275                 if (granted <= usage)
276                         qbody->qb_count = 0;
277                 else
278                         qbody->qb_count = granted - usage;
279                 qbody->qb_flags |= QUOTA_DQACQ_FL_PREACQ;
280                 RETURN(true);
281         }
282
283         if (qbody != NULL)
284                 RETURN(qbody->qb_flags != 0);
285         else
286                 RETURN(false);
287 }
288
289 /**
290  * Helper function returning true when quota space need to be adjusted (some
291  * unused space should be free or pre-acquire) and false otherwise.
292  */
293 static inline bool qsd_adjust_needed(struct lquota_entry *lqe)
294 {
295         return qsd_calc_adjust(lqe, NULL);
296 }
297
298 /**
299  * Callback function called when an acquire/release request sent to the master
300  * is completed
301  */
302 static void qsd_req_completion(const struct lu_env *env,
303                                struct qsd_qtype_info *qqi,
304                                struct quota_body *reqbody,
305                                struct quota_body *repbody,
306                                struct lustre_handle *lockh,
307                                struct lquota_lvb *lvb,
308                                void *arg, int ret)
309 {
310         struct lquota_entry     *lqe = (struct lquota_entry *)arg;
311         struct qsd_thread_info  *qti;
312         int                      rc;
313         bool                     adjust = false, cancel = false;
314         ENTRY;
315
316         LASSERT(qqi != NULL && lqe != NULL);
317
318         /* environment passed by ptlrpcd is mostly used by CLIO and hasn't the
319          * DT tags set. */
320         rc = lu_env_refill_by_tags((struct lu_env *)env, LCT_DT_THREAD, 0);
321         if (rc) {
322                 LQUOTA_ERROR(lqe, "failed to refill environmnent %d", rc);
323                 lqe_write_lock(lqe);
324                 /* can't afford to adjust quota space with no suitable lu_env */
325                 GOTO(out_noadjust, rc);
326         }
327         qti = qsd_info(env);
328
329         lqe_write_lock(lqe);
330         LQUOTA_DEBUG(lqe, "DQACQ returned %d, flags:0x%x", ret,
331                      reqbody->qb_flags);
332
333         /* despite -EDQUOT & -EINPROGRESS errors, the master might still
334          * grant us back quota space to adjust quota overrun */
335         if (ret != 0 && ret != -EDQUOT && ret != -EINPROGRESS) {
336                 if (ret != -ETIMEDOUT && ret != -ENOTCONN &&
337                    ret != -ESHUTDOWN && ret != -EAGAIN)
338                         /* print errors only if return code is unexpected */
339                         LQUOTA_ERROR(lqe, "DQACQ failed with %d, flags:0x%x",
340                                      ret, reqbody->qb_flags);
341                 GOTO(out, ret);
342         }
343
344         /* Set the lqe_lockh */
345         if (lustre_handle_is_used(lockh) &&
346             !lustre_handle_equal(lockh, &lqe->lqe_lockh))
347                 lustre_handle_copy(&lqe->lqe_lockh, lockh);
348
349         /* If the replied qb_count is zero, it means master didn't process
350          * the DQACQ since the limit for this ID has been removed, so we
351          * should not update quota entry & slave index copy neither. */
352         if (repbody != NULL && repbody->qb_count != 0) {
353                 LQUOTA_DEBUG(lqe, "DQACQ qb_count:%llu", repbody->qb_count);
354
355                 if (req_is_rel(reqbody->qb_flags)) {
356                         if (lqe->lqe_granted < repbody->qb_count) {
357                                 LQUOTA_ERROR(lqe, "can't release more space "
358                                              "than owned %llu<%llu",
359                                              lqe->lqe_granted,
360                                              repbody->qb_count);
361                                 lqe->lqe_granted = 0;
362                         } else {
363                                 lqe->lqe_granted -= repbody->qb_count;
364                         }
365                         /* Cancel the per-ID lock initiatively when there
366                          * isn't any usage & grant, which can avoid master
367                          * sending glimpse unnecessarily to this slave on
368                          * quota revoking */
369                         if (!lqe->lqe_pending_write && !lqe->lqe_granted &&
370                             !lqe->lqe_waiting_write && !lqe->lqe_usage)
371                                 cancel = true;
372                 } else {
373                         lqe->lqe_granted += repbody->qb_count;
374                 }
375                 qti->qti_rec.lqr_slv_rec.qsr_granted = lqe->lqe_granted;
376                 lqe_write_unlock(lqe);
377
378                 /* Update the slave index file in the dedicated thread. So far,
379                  * We don't update the version of slave index copy on DQACQ.
380                  * No locking is necessary since nobody can change
381                  * lqe->lqe_granted while lqe->lqe_pending_req > 0 */
382                 qsd_upd_schedule(qqi, lqe, &lqe->lqe_id, &qti->qti_rec, 0,
383                                  false);
384                 lqe_write_lock(lqe);
385         }
386
387         /* extract information from lvb */
388         if (ret == 0 && lvb != NULL) {
389                 if (lvb->lvb_id_qunit != 0)
390                         qsd_set_qunit(lqe, lvb->lvb_id_qunit);
391                 qsd_set_edquot(lqe, !!(lvb->lvb_flags & LQUOTA_FL_EDQUOT));
392         } else if (repbody != NULL && repbody->qb_qunit != 0) {
393                 qsd_set_qunit(lqe, repbody->qb_qunit);
394         }
395
396         /* turn off pre-acquire if it failed with -EDQUOT. This is done to avoid
397          * flooding the master with acquire request. Pre-acquire will be turned
398          * on again as soon as qunit is modified */
399         if (req_is_preacq(reqbody->qb_flags) && ret == -EDQUOT)
400                 lqe->lqe_nopreacq = true;
401 out:
402         adjust = qsd_adjust_needed(lqe);
403         if (reqbody && req_is_acq(reqbody->qb_flags) && ret != -EDQUOT) {
404                 lqe->lqe_acq_rc = ret;
405                 lqe->lqe_acq_time = ktime_get_seconds();
406         }
407 out_noadjust:
408         qsd_request_exit(lqe);
409         lqe_write_unlock(lqe);
410
411         /* release reference on per-ID lock */
412         if (lustre_handle_is_used(lockh))
413                 ldlm_lock_decref(lockh, qsd_id_einfo.ei_mode);
414
415         if (cancel) {
416                 qsd_adjust_schedule(lqe, false, true);
417         } else if (adjust) {
418                 if (!ret || ret == -EDQUOT)
419                         qsd_adjust_schedule(lqe, false, false);
420                 else
421                         qsd_adjust_schedule(lqe, true, false);
422         }
423         lqe_putref(lqe);
424
425         if (lvb)
426                 OBD_FREE_PTR(lvb);
427         EXIT;
428 }
429
430 /**
431  * Try to consume local quota space.
432  *
433  * \param lqe   - is the qid entry to be processed
434  * \param space - is the amount of quota space needed to complete the operation
435  *
436  * \retval 0       - success
437  * \retval -EDQUOT - out of quota
438  * \retval -EAGAIN - need to acquire space from master
439  */
440 static int qsd_acquire_local(struct lquota_entry *lqe, __u64 space)
441 {
442         __u64   usage;
443         int     rc;
444         ENTRY;
445
446         if (!lqe->lqe_enforced)
447                 /* not enforced any more, we are good */
448                 RETURN(-ESRCH);
449
450         lqe_write_lock(lqe);
451         /* use latest usage */
452         usage = lqe->lqe_usage;
453         /* take pending write into account */
454         usage += lqe->lqe_pending_write;
455
456         if (space + usage <= lqe->lqe_granted - lqe->lqe_pending_rel) {
457                 /* Yay! we got enough space */
458                 lqe->lqe_pending_write += space;
459                 lqe->lqe_waiting_write -= space;
460                 rc = 0;
461         /* lqe_edquot flag is used to avoid flooding dqacq requests when
462          * the user is over quota, however, the lqe_edquot could be stale
463          * sometimes due to the race reply of dqacq vs. id lock glimpse
464          * (see LU-4505), so we revalidate it every 5 seconds. */
465         } else if (lqe->lqe_edquot &&
466                    (lqe->lqe_edquot_time > ktime_get_seconds() - 5)) {
467                 rc = -EDQUOT;
468         }else {
469                 rc = -EAGAIN;
470         }
471         lqe_write_unlock(lqe);
472
473         RETURN(rc);
474 }
475
476 /**
477  * Compute how much quota space should be acquire from the master based
478  * on how much is currently granted to this slave and pending/waiting
479  * operations.
480  *
481  * \param lqe - is the lquota entry for which we would like to adjust quota
482  *              space.
483  * \param qbody - is the quota body of the acquire request to fill
484  *
485  * \retval true  - space acquisition is needed and qbody is filled
486  * \retval false - no space acquisition required
487  */
488 static inline bool qsd_calc_acquire(struct lquota_entry *lqe,
489                                     struct quota_body *qbody)
490 {
491         __u64   usage, granted;
492
493         usage   = lqe->lqe_usage;
494         usage  += lqe->lqe_pending_write + lqe->lqe_waiting_write;
495         granted = lqe->lqe_granted;
496
497         qbody->qb_flags = 0;
498
499         /* if we overconsumed quota space, we report usage in request so that
500          * master can adjust it unconditionally */
501         if (lqe->lqe_usage > lqe->lqe_granted) {
502                 qbody->qb_usage = lqe->lqe_usage;
503                 qbody->qb_flags = QUOTA_DQACQ_FL_REPORT;
504                 granted = lqe->lqe_usage;
505         }
506
507         /* acquire as much as needed, but not more */
508         if (usage > granted) {
509                 qbody->qb_count  = usage - granted;
510                 qbody->qb_flags |= QUOTA_DQACQ_FL_ACQ;
511         }
512
513         return qbody->qb_flags != 0;
514 }
515
516 /**
517  * Acquire quota space from master.
518  * There are at most 1 in-flight dqacq/dqrel.
519  *
520  * \param env    - the environment passed by the caller
521  * \param lqe    - is the qid entry to be processed
522  *
523  * \retval 0            - success
524  * \retval -EDQUOT      - out of quota
525  * \retval -EINPROGRESS - inform client to retry write/create
526  * \retval -EBUSY       - already a quota request in flight
527  * \retval -ve          - other appropriate errors
528  */
529 static int qsd_acquire_remote(const struct lu_env *env,
530                               struct lquota_entry *lqe)
531 {
532         struct qsd_thread_info  *qti = qsd_info(env);
533         struct quota_body       *qbody = &qti->qti_body;
534         struct qsd_instance     *qsd;
535         struct qsd_qtype_info   *qqi;
536         int                      rc;
537         ENTRY;
538
539         memset(qbody, 0, sizeof(*qbody));
540         rc = qsd_ready(lqe, &qbody->qb_glb_lockh);
541         if (rc)
542                 RETURN(rc);
543
544         qqi = lqe2qqi(lqe);
545         qsd = qqi->qqi_qsd;
546
547         lqe_write_lock(lqe);
548
549         /* is quota really enforced for this id? */
550         if (!lqe->lqe_enforced) {
551                 lqe_write_unlock(lqe);
552                 LQUOTA_DEBUG(lqe, "quota not enforced any more");
553                 RETURN(0);
554         }
555
556         /* fill qb_count & qb_flags */
557         if (!qsd_calc_acquire(lqe, qbody)) {
558                 lqe_write_unlock(lqe);
559                 LQUOTA_DEBUG(lqe, "No acquire required");
560                 RETURN(0);
561         }
562
563         /* check whether an acquire request completed recently */
564         if (lqe->lqe_acq_rc != 0 &&
565             lqe->lqe_acq_time > ktime_get_seconds() - 1) {
566                 lqe_write_unlock(lqe);
567                 LQUOTA_DEBUG(lqe, "using cached return code %d", lqe->lqe_acq_rc);
568                 RETURN(lqe->lqe_acq_rc);
569         }
570
571         /* only 1 quota request in flight for a given ID is allowed */
572         rc = qsd_request_enter(lqe);
573         if (rc) {
574                 lqe_write_unlock(lqe);
575                 RETURN(rc);
576         }
577
578         lustre_handle_copy(&qti->qti_lockh, &lqe->lqe_lockh);
579         lqe_write_unlock(lqe);
580
581         /* hold a refcount until completion */
582         lqe_getref(lqe);
583
584         /* fill other quota body fields */
585         qbody->qb_fid = qqi->qqi_fid;
586         qbody->qb_id  = lqe->lqe_id;
587
588         /* check whether we already own a valid lock for this ID */
589         rc = qsd_id_lock_match(&qti->qti_lockh, &qbody->qb_lockh);
590         if (rc) {
591                 struct lquota_lvb *lvb;
592
593                 OBD_ALLOC_PTR(lvb);
594                 if (lvb == NULL) {
595                         rc = -ENOMEM;
596                         qsd_req_completion(env, qqi, qbody, NULL,
597                                            &qti->qti_lockh, NULL, lqe, rc);
598                         RETURN(rc);
599                 }
600                 /* no lock found, should use intent */
601                 rc = qsd_intent_lock(env, qsd->qsd_exp, qbody, true,
602                                      IT_QUOTA_DQACQ, qsd_req_completion,
603                                      qqi, lvb, (void *)lqe);
604         } else {
605                 /* lock found, should use regular dqacq */
606                 rc = qsd_send_dqacq(env, qsd->qsd_exp, qbody, true,
607                                     qsd_req_completion, qqi, &qti->qti_lockh,
608                                     lqe);
609         }
610
611         /* the completion function will be called by qsd_send_dqacq or
612          * qsd_intent_lock */
613         RETURN(rc);
614 }
615
616 /**
617  * Acquire \a space of quota space in order to complete an operation.
618  * Try to consume local quota space first and send acquire request to quota
619  * master if required.
620  *
621  * \param env   - the environment passed by the caller
622  * \param lqe   - is the qid entry to be processed
623  * \param space - is the amount of quota required for the operation
624  * \param ret   - is the return code (-EDQUOT, -EINPROGRESS, ...)
625  *
626  * \retval true  - stop waiting in wait_event_idle_timeout,
627  *                 and real return value in \a ret
628  * \retval false - continue waiting
629  */
630 static bool qsd_acquire(const struct lu_env *env, struct lquota_entry *lqe,
631                         long long space, int *ret)
632 {
633         int rc = 0, count;
634         int wait_pending = 0;
635         struct qsd_qtype_info *qqi = lqe2qqi(lqe);
636
637         ENTRY;
638
639         for (count = 0; rc == 0; count++) {
640                 LQUOTA_DEBUG(lqe, "acquiring:%lld count=%d", space, count);
641 again:
642                 if (lqe2qqi(lqe)->qqi_qsd->qsd_stopping) {
643                         rc = -EINPROGRESS;
644                         break;
645                 }
646
647                 /* refresh disk usage */
648                 rc = qsd_refresh_usage(env, lqe);
649                 if (rc)
650                         break;
651
652                 /* try to consume local quota space first */
653                 rc = qsd_acquire_local(lqe, space);
654                 if (rc != -EAGAIN)
655                         /* rc == 0, Wouhou! enough local quota space
656                          * rc < 0, something bad happened */
657                          break;
658                 /*
659                  * There might be a window that commit transaction
660                  * have updated usage but pending write doesn't change
661                  * wait for it before acquiring remotely.
662                  */
663                 if (lqe->lqe_pending_write >= space && !wait_pending) {
664                         wait_pending = 1;
665                         dt_sync(env, qqi->qqi_qsd->qsd_dev);
666                         goto again;
667                 }
668
669                 /* if we have gotten some quota and stil wait more quota,
670                  * it's better to give QMT some time to reclaim from clients */
671                 if (count > 0)
672                         schedule_timeout_interruptible(cfs_time_seconds(1));
673
674                 /* need to acquire more quota space from master */
675                 rc = qsd_acquire_remote(env, lqe);
676         }
677
678         if (rc == -EBUSY)
679                 /* already a request in flight, continue waiting */
680                 RETURN(false);
681         *ret = rc;
682         RETURN(true);
683 }
684
685 /**
686  * Quota enforcement handler. If local quota can satisfy this operation,
687  * return success, otherwise, acquire more quota from master.
688  * (for write operation, if master isn't available at this moment, return
689  * -EINPROGRESS to inform client to retry the write)
690  *
691  * \param env   - the environment passed by the caller
692  * \param qsd   - is the qsd instance associated with the device in charge
693  *                of the operation.
694  * \param qid   - is the qid information attached in the transaction handle
695  * \param space - is the space required by the operation
696  * \param flags - if the operation is write, return caller no user/group
697  *                and sync commit flags
698  *
699  * \retval 0            - success
700  * \retval -EDQUOT      - out of quota
701  * \retval -EINPROGRESS - inform client to retry write
702  * \retval -ve          - other appropriate errors
703  */
704 static int qsd_op_begin0(const struct lu_env *env, struct qsd_qtype_info *qqi,
705                          struct lquota_id_info *qid, long long space,
706                          enum osd_quota_local_flags *local_flags)
707 {
708         struct lquota_entry *lqe;
709         enum osd_quota_local_flags qtype_flag = 0;
710         int rc, ret = -EINPROGRESS;
711         ENTRY;
712
713         if (qid->lqi_qentry != NULL) {
714                 /* we already had to deal with this id for this transaction */
715                 lqe = qid->lqi_qentry;
716                 if (!lqe->lqe_enforced)
717                         RETURN(0);
718         } else {
719                 /* look up lquota entry associated with qid */
720                 lqe = lqe_locate(env, qqi->qqi_site, &qid->lqi_id);
721                 if (IS_ERR(lqe))
722                         RETURN(PTR_ERR(lqe));
723                 if (!lqe->lqe_enforced) {
724                         lqe_putref(lqe);
725                         RETURN(0);
726                 }
727                 qid->lqi_qentry = lqe;
728                 /* lqe will be released in qsd_op_end() */
729         }
730
731         if (space <= 0) {
732                 /* when space is negative or null, we don't need to consume
733                  * quota space. That said, we still want to perform space
734                  * adjustments in qsd_op_end, so we return here, but with
735                  * a reference on the lqe */
736                 if (local_flags != NULL) {
737                         rc = qsd_refresh_usage(env, lqe);
738                         GOTO(out_flags, rc);
739                 }
740                 RETURN(0);
741         }
742
743         LQUOTA_DEBUG(lqe, "op_begin space:%lld", space);
744
745         lqe_write_lock(lqe);
746         lqe->lqe_waiting_write += space;
747         lqe_write_unlock(lqe);
748
749         /* acquire quota space for the operation, cap overall wait time to
750          * prevent a service thread from being stuck for too long */
751         rc = wait_event_idle_timeout(
752                 lqe->lqe_waiters, qsd_acquire(env, lqe, space, &ret),
753                 cfs_time_seconds(qsd_wait_timeout(qqi->qqi_qsd)));
754
755         if (rc > 0 && ret == 0) {
756                 qid->lqi_space += space;
757                 rc = 0;
758         } else {
759                 if (rc > 0)
760                         rc = ret;
761                 else if (rc == 0)
762                         rc = -ETIMEDOUT;
763
764                 LQUOTA_DEBUG(lqe, "acquire quota failed:%d", rc);
765
766                 lqe_write_lock(lqe);
767                 lqe->lqe_waiting_write -= space;
768
769                 if (local_flags && lqe->lqe_pending_write != 0)
770                         /* Inform OSD layer that there are pending writes.
771                          * It might want to retry after a sync if appropriate */
772                          *local_flags |= QUOTA_FL_SYNC;
773                 lqe_write_unlock(lqe);
774
775                 /* convert recoverable error into -EINPROGRESS, client will
776                  * retry */
777                 if (rc == -ETIMEDOUT || rc == -ENOTCONN || rc == -ENOLCK ||
778                     rc == -EAGAIN || rc == -EINTR) {
779                         rc = -EINPROGRESS;
780                 } else if (rc == -ESRCH) {
781                         rc = 0;
782                         LQUOTA_ERROR(lqe, "ID isn't enforced on master, it "
783                                      "probably due to a legeal race, if this "
784                                      "message is showing up constantly, there "
785                                      "could be some inconsistence between "
786                                      "master & slave, and quota reintegration "
787                                      "needs be re-triggered.");
788                 }
789         }
790
791         if (local_flags != NULL) {
792 out_flags:
793                 LASSERT(qid->lqi_is_blk);
794                 if (rc != 0) {
795                         *local_flags |= lquota_over_fl(qqi->qqi_qtype);
796                 } else {
797                         __u64   usage;
798
799                         lqe_read_lock(lqe);
800                         usage = lqe->lqe_pending_write;
801                         usage += lqe->lqe_waiting_write;
802                         /* There is a chance to successfully grant more quota
803                          * but get edquot flag through glimpse. */
804                         if (lqe->lqe_edquot || (lqe->lqe_qunit != 0 &&
805                            (usage % lqe->lqe_qunit >
806                             qqi->qqi_qsd->qsd_sync_threshold)))
807                                 usage += qqi->qqi_qsd->qsd_sync_threshold;
808
809                         usage += lqe->lqe_usage;
810
811                         qtype_flag = lquota_over_fl(qqi->qqi_qtype);
812                         /* if we should notify client to start sync write */
813                         if (usage >= lqe->lqe_granted - lqe->lqe_pending_rel)
814                                 *local_flags |= qtype_flag;
815                         else
816                                 *local_flags &= ~qtype_flag;
817                         lqe_read_unlock(lqe);
818                 }
819         }
820         RETURN(rc);
821 }
822
823 /**
824  * helper function comparing two lquota_id_info structures
825  */
826 static inline bool qid_equal(struct lquota_id_info *q1,
827                              struct lquota_id_info *q2)
828 {
829         if (q1->lqi_is_blk != q2->lqi_is_blk || q1->lqi_type != q2->lqi_type)
830                 return false;
831         return (q1->lqi_id.qid_uid == q2->lqi_id.qid_uid) ? true : false;
832 }
833
834 /**
835  * Enforce quota, it's called in the declaration of each operation.
836  * qsd_op_end() will then be called later once all the operations have been
837  * completed in order to release/adjust the quota space.
838  *
839  * \param env   - the environment passed by the caller
840  * \param qsd   - is the qsd instance associated with the device in charge of
841  *                the operation.
842  * \param trans - is the quota transaction information
843  * \param qi    - qid & space required by current operation
844  * \param flags - if the operation is write, return caller no user/group and
845  *                sync commit flags
846  *
847  * \retval 0            - success
848  * \retval -EDQUOT      - out of quota
849  * \retval -EINPROGRESS - inform client to retry write
850  * \retval -ve          - other appropriate errors
851  */
852 int qsd_op_begin(const struct lu_env *env, struct qsd_instance *qsd,
853                  struct lquota_trans *trans, struct lquota_id_info *qi,
854                  enum osd_quota_local_flags *local_flags)
855 {
856         int     i, rc;
857         bool    found = false;
858         ENTRY;
859
860         if (unlikely(qsd == NULL))
861                 RETURN(0);
862
863         if (qsd->qsd_dev->dd_rdonly)
864                 RETURN(0);
865
866         /* We don't enforce quota until the qsd_instance is started */
867         read_lock(&qsd->qsd_lock);
868         if (!qsd->qsd_started) {
869                 read_unlock(&qsd->qsd_lock);
870                 RETURN(0);
871         }
872         read_unlock(&qsd->qsd_lock);
873
874         /* ignore block quota on MDTs, ignore inode quota on OSTs */
875         if ((!qsd->qsd_is_md && !qi->lqi_is_blk) ||
876             (qsd->qsd_is_md && qi->lqi_is_blk))
877                 RETURN(0);
878
879         /* ignore quota enforcement request when:
880          *    - quota isn't enforced for this quota type
881          * or - the user/group is root
882          * or - quota accounting isn't enabled */
883         if (!qsd_type_enabled(qsd, qi->lqi_type) || qi->lqi_id.qid_uid == 0 ||
884             (qsd->qsd_type_array[qi->lqi_type])->qqi_acct_failed)
885                 RETURN(0);
886
887         LASSERTF(trans->lqt_id_cnt <= QUOTA_MAX_TRANSIDS, "id_cnt=%d\n",
888                  trans->lqt_id_cnt);
889         /* check whether we already allocated a slot for this id */
890         for (i = 0; i < trans->lqt_id_cnt; i++) {
891                 if (qid_equal(qi, &trans->lqt_ids[i])) {
892                         found = true;
893                         break;
894                 }
895         }
896
897         if (!found) {
898                 if (unlikely(i >= QUOTA_MAX_TRANSIDS)) {
899                         CERROR("%s: more than %d qids enforced for a "
900                                "transaction?\n", qsd->qsd_svname, i);
901                         RETURN(-EINVAL);
902                 }
903
904                 /* fill new slot */
905                 trans->lqt_ids[i].lqi_id     = qi->lqi_id;
906                 trans->lqt_ids[i].lqi_type   = qi->lqi_type;
907                 trans->lqt_ids[i].lqi_is_blk = qi->lqi_is_blk;
908                 trans->lqt_id_cnt++;
909         }
910
911         /* manage quota enforcement for this ID */
912         rc = qsd_op_begin0(env, qsd->qsd_type_array[qi->lqi_type],
913                            &trans->lqt_ids[i], qi->lqi_space, local_flags);
914         RETURN(rc);
915 }
916 EXPORT_SYMBOL(qsd_op_begin);
917
918 /**
919  * Adjust quota space (by acquiring or releasing) hold by the quota slave.
920  * This function is called after each quota request completion and during
921  * reintegration in order to report usage or re-acquire quota locks.
922  * Space adjustment is aborted if there is already a quota request in flight
923  * for this ID.
924  *
925  * \param env    - the environment passed by the caller
926  * \param lqe    - is the qid entry to be processed
927  *
928  * \retval 0 on success, appropriate errors on failure
929  */
930 int qsd_adjust(const struct lu_env *env, struct lquota_entry *lqe)
931 {
932         struct qsd_thread_info  *qti = qsd_info(env);
933         struct quota_body       *qbody = &qti->qti_body;
934         struct qsd_instance     *qsd;
935         struct qsd_qtype_info   *qqi;
936         int                      rc;
937         bool                     intent = false;
938         ENTRY;
939
940         memset(qbody, 0, sizeof(*qbody));
941         rc = qsd_ready(lqe, &qbody->qb_glb_lockh);
942         if (rc) {
943                 /* add to adjust list again to trigger adjustment later when
944                  * slave is ready */
945                 LQUOTA_DEBUG(lqe, "delaying adjustment since qsd isn't ready");
946                 qsd_adjust_schedule(lqe, true, false);
947                 RETURN(0);
948         }
949
950         qqi = lqe2qqi(lqe);
951         qsd = qqi->qqi_qsd;
952
953         if (qsd->qsd_dev->dd_rdonly)
954                 RETURN(0);
955
956         lqe_write_lock(lqe);
957
958         /* fill qb_count & qb_flags */
959         if (!qsd_calc_adjust(lqe, qbody)) {
960                 lqe_write_unlock(lqe);
961                 LQUOTA_DEBUG(lqe, "no adjustment required");
962                 RETURN(0);
963         }
964
965         /* only 1 quota request in flight for a given ID is allowed */
966         rc = qsd_request_enter(lqe);
967         if (rc) {
968                 /* already a request in flight, space adjustment will be run
969                  * again on request completion */
970                 lqe_write_unlock(lqe);
971                 RETURN(0);
972         }
973
974         if (req_is_rel(qbody->qb_flags))
975                 lqe->lqe_pending_rel = qbody->qb_count;
976         lustre_handle_copy(&qti->qti_lockh, &lqe->lqe_lockh);
977         lqe_write_unlock(lqe);
978
979         /* hold a refcount until completion */
980         lqe_getref(lqe);
981
982         /* fill other quota body fields */
983         qbody->qb_fid = qqi->qqi_fid;
984         qbody->qb_id  = lqe->lqe_id;
985
986         if (req_is_acq(qbody->qb_flags) || req_is_preacq(qbody->qb_flags)) {
987                 /* check whether we own a valid lock for this ID */
988                 rc = qsd_id_lock_match(&qti->qti_lockh, &qbody->qb_lockh);
989                 if (rc) {
990                         memset(&qti->qti_lockh, 0, sizeof(qti->qti_lockh));
991                         if (req_is_preacq(qbody->qb_flags)) {
992                                 if (req_has_rep(qbody->qb_flags))
993                                         /* still want to report usage */
994                                         qbody->qb_flags = QUOTA_DQACQ_FL_REPORT;
995                                 else
996                                         /* no pre-acquire if no per-ID lock */
997                                         GOTO(out, rc = -ENOLCK);
998                         } else {
999                                 /* no lock found, should use intent */
1000                                 intent = true;
1001                         }
1002                 } else if (req_is_acq(qbody->qb_flags) &&
1003                            qbody->qb_count == 0) {
1004                         /* found cached lock, no need to acquire */
1005                         GOTO(out, rc = 0);
1006                 }
1007         } else {
1008                 /* release and report don't need a per-ID lock */
1009                 memset(&qti->qti_lockh, 0, sizeof(qti->qti_lockh));
1010         }
1011
1012         if (!intent) {
1013                 rc = qsd_send_dqacq(env, qsd->qsd_exp, qbody, false,
1014                                     qsd_req_completion, qqi, &qti->qti_lockh,
1015                                     lqe);
1016         } else {
1017                 struct lquota_lvb *lvb;
1018
1019                 OBD_ALLOC_PTR(lvb);
1020                 if (lvb == NULL)
1021                         GOTO(out, rc = -ENOMEM);
1022
1023                 rc = qsd_intent_lock(env, qsd->qsd_exp, qbody, false,
1024                                      IT_QUOTA_DQACQ, qsd_req_completion,
1025                                      qqi, lvb, (void *)lqe);
1026         }
1027         /* the completion function will be called by qsd_send_dqacq or
1028          * qsd_intent_lock */
1029         RETURN(rc);
1030 out:
1031         qsd_req_completion(env, qqi, qbody, NULL, &qti->qti_lockh, NULL, lqe,
1032                            rc);
1033         return rc;
1034 }
1035
1036 /**
1037  * Post quota operation, pre-acquire/release quota from master.
1038  *
1039  * \param  env  - the environment passed by the caller
1040  * \param  qsd  - is the qsd instance attached to the OSD device which
1041  *                is handling the operation.
1042  * \param  qqi  - is the qsd_qtype_info structure associated with the quota ID
1043  *                subject to the operation
1044  * \param  qid  - stores information related to his ID for the operation
1045  *                which has just completed
1046  *
1047  * \retval 0    - success
1048  * \retval -ve  - failure
1049  */
1050 static void qsd_op_end0(const struct lu_env *env, struct qsd_qtype_info *qqi,
1051                         struct lquota_id_info *qid)
1052 {
1053         struct lquota_entry     *lqe;
1054         bool                     adjust;
1055         ENTRY;
1056
1057         lqe = qid->lqi_qentry;
1058         if (lqe == NULL)
1059                 RETURN_EXIT;
1060         qid->lqi_qentry = NULL;
1061
1062         /* refresh cached usage if a suitable environment is passed */
1063         if (env != NULL)
1064                 qsd_refresh_usage(env, lqe);
1065
1066         lqe_write_lock(lqe);
1067         if (qid->lqi_space > 0)
1068                 lqe->lqe_pending_write -= qid->lqi_space;
1069         if (env != NULL)
1070                 adjust = qsd_adjust_needed(lqe);
1071         else
1072                 adjust = true;
1073         lqe_write_unlock(lqe);
1074
1075         if (adjust) {
1076                 /* pre-acquire/release quota space is needed */
1077                 if (env != NULL)
1078                         qsd_adjust(env, lqe);
1079                 else
1080                         /* no suitable environment, handle adjustment in
1081                          * separate thread context */
1082                         qsd_adjust_schedule(lqe, false, false);
1083         }
1084         lqe_putref(lqe);
1085         EXIT;
1086 }
1087
1088 /**
1089  * Post quota operation. It's called after each operation transaction stopped.
1090  *
1091  * \param  env   - the environment passed by the caller
1092  * \param  qsd   - is the qsd instance associated with device which is handling
1093  *                 the operation.
1094  * \param  qids  - all qids information attached in the transaction handle
1095  * \param  count - is the number of qid entries in the qids array.
1096  *
1097  * \retval 0     - success
1098  * \retval -ve   - failure
1099  */
1100 void qsd_op_end(const struct lu_env *env, struct qsd_instance *qsd,
1101                 struct lquota_trans *trans)
1102 {
1103         int i;
1104         ENTRY;
1105
1106         if (unlikely(qsd == NULL))
1107                 RETURN_EXIT;
1108
1109         if (qsd->qsd_dev->dd_rdonly)
1110                 RETURN_EXIT;
1111
1112         /* We don't enforce quota until the qsd_instance is started */
1113         read_lock(&qsd->qsd_lock);
1114         if (!qsd->qsd_started) {
1115                 read_unlock(&qsd->qsd_lock);
1116                 RETURN_EXIT;
1117         }
1118         read_unlock(&qsd->qsd_lock);
1119
1120         LASSERT(trans != NULL);
1121
1122         for (i = 0; i < trans->lqt_id_cnt; i++) {
1123                 struct qsd_qtype_info *qqi;
1124
1125                 if (trans->lqt_ids[i].lqi_qentry == NULL)
1126                         continue;
1127
1128                 qqi = qsd->qsd_type_array[trans->lqt_ids[i].lqi_type];
1129                 qsd_op_end0(env, qqi, &trans->lqt_ids[i]);
1130         }
1131
1132         /* reset id_count to 0 so that a second accidental call to qsd_op_end()
1133          * does not result in failure */
1134         trans->lqt_id_cnt = 0;
1135         EXIT;
1136 }
1137 EXPORT_SYMBOL(qsd_op_end);
1138
1139 /* Simple wrapper on top of qsd API which implement quota transfer for osd
1140  * setattr needs. As a reminder, only the root user can change ownership of
1141  * a file, that's why EDQUOT & EINPROGRESS errors are discarded
1142  */
1143 int qsd_transfer(const struct lu_env *env, struct qsd_instance *qsd,
1144                  struct lquota_trans *trans, unsigned int qtype,
1145                  u64 orig_id, u64 new_id, u64 bspace,
1146                  struct lquota_id_info *qi)
1147 {
1148         int rc;
1149
1150         if (unlikely(!qsd))
1151                 return 0;
1152
1153         LASSERT(qtype < LL_MAXQUOTAS);
1154         if (qtype == PRJQUOTA)
1155                 if (!projid_valid(make_kprojid(&init_user_ns, new_id)))
1156                         return -EINVAL;
1157
1158         qi->lqi_type = qtype;
1159
1160         /* inode accounting */
1161         qi->lqi_is_blk = false;
1162
1163         /* one more inode for the new owner ... */
1164         qi->lqi_id.qid_uid = new_id;
1165         qi->lqi_space = 1;
1166         rc = qsd_op_begin(env, qsd, trans, qi, NULL);
1167         if (rc == -EDQUOT || rc == -EINPROGRESS)
1168                 rc = 0;
1169         if (rc)
1170                 return rc;
1171
1172         /* and one less inode for the current id */
1173         qi->lqi_id.qid_uid = orig_id;
1174         qi->lqi_space = -1;
1175         /* can't get EDQUOT when reducing usage */
1176         rc = qsd_op_begin(env, qsd, trans, qi, NULL);
1177         if (rc == -EINPROGRESS)
1178                 rc = 0;
1179         if (rc)
1180                 return rc;
1181
1182         /* block accounting */
1183         qi->lqi_is_blk = true;
1184
1185         /* more blocks for the new owner ... */
1186         qi->lqi_id.qid_uid = new_id;
1187         qi->lqi_space = bspace;
1188         rc = qsd_op_begin(env, qsd, trans, qi, NULL);
1189         if (rc == -EDQUOT || rc == -EINPROGRESS)
1190                 rc = 0;
1191         if (rc)
1192                 return rc;
1193
1194         /* and finally less blocks for the current owner */
1195         qi->lqi_id.qid_uid = orig_id;
1196         qi->lqi_space = -bspace;
1197         rc = qsd_op_begin(env, qsd, trans, qi, NULL);
1198         /* can't get EDQUOT when reducing usage */
1199         if (rc == -EINPROGRESS)
1200                 rc = 0;
1201         return rc;
1202 }
1203 EXPORT_SYMBOL(qsd_transfer);
1204
1205 /**
1206  * Trigger pre-acquire/release if necessary.
1207  * It's only used by ldiskfs osd so far. When unlink a file in ldiskfs, the
1208  * quota accounting isn't updated when the transaction stopped. Instead, it'll
1209  * be updated on the final iput, so qsd_op_adjust() will be called then (in
1210  * osd_object_delete()) to trigger quota release if necessary.
1211  *
1212  * \param env - the environment passed by the caller
1213  * \param qsd - is the qsd instance associated with the device in charge
1214  *              of the operation.
1215  * \param qid - is the lquota ID of the user/group for which to trigger
1216  *              quota space adjustment
1217  * \param qtype - is the quota type (USRQUOTA or GRPQUOTA)
1218  */
1219 void qsd_op_adjust(const struct lu_env *env, struct qsd_instance *qsd,
1220                    union lquota_id *qid, int qtype)
1221 {
1222         struct lquota_entry    *lqe;
1223         struct qsd_qtype_info  *qqi;
1224         bool                    adjust;
1225         ENTRY;
1226
1227         if (unlikely(qsd == NULL))
1228                 RETURN_EXIT;
1229
1230         /* We don't enforce quota until the qsd_instance is started */
1231         read_lock(&qsd->qsd_lock);
1232         if (!qsd->qsd_started) {
1233                 read_unlock(&qsd->qsd_lock);
1234                 RETURN_EXIT;
1235         }
1236         read_unlock(&qsd->qsd_lock);
1237
1238         qqi = qsd->qsd_type_array[qtype];
1239         LASSERT(qqi);
1240
1241         if (!qsd_type_enabled(qsd, qtype) || qqi->qqi_acct_obj == NULL ||
1242             qid->qid_uid == 0)
1243                 RETURN_EXIT;
1244
1245         read_lock(&qsd->qsd_lock);
1246         if (!qsd->qsd_started) {
1247                 read_unlock(&qsd->qsd_lock);
1248                 RETURN_EXIT;
1249         }
1250         read_unlock(&qsd->qsd_lock);
1251
1252         lqe = lqe_locate(env, qqi->qqi_site, qid);
1253         if (IS_ERR(lqe)) {
1254                 CERROR("%s: fail to locate lqe for id:%llu, type:%d\n",
1255                        qsd->qsd_svname, qid->qid_uid, qtype);
1256                 RETURN_EXIT;
1257         }
1258
1259         qsd_refresh_usage(env, lqe);
1260
1261         lqe_read_lock(lqe);
1262         adjust = qsd_adjust_needed(lqe);
1263         lqe_read_unlock(lqe);
1264
1265         if (adjust)
1266                 qsd_adjust(env, lqe);
1267
1268         lqe_putref(lqe);
1269         EXIT;
1270 }
1271 EXPORT_SYMBOL(qsd_op_adjust);
1272
1273 /**
1274  * Reserve or free quota.
1275  *
1276  * Currently, It's used to reserve quota space before changing the file's group
1277  * for normal user and free the reserved quota after the group change.
1278  *
1279  * \param env     - the environment passed by the caller
1280  * \param qsd     - is the qsd instance associated with the device in charge of
1281  *                  the operation.
1282  * \param qi      - qid & space required by current operation
1283  *
1284  * \retval 0            - success
1285  * \retval -EDQUOT      - out of quota
1286  * \retval -EINPROGRESS - inform client to retry write
1287  * \retval -ve          - other appropriate errors
1288  */
1289 int qsd_reserve_or_free_quota(const struct lu_env *env,
1290                               struct qsd_instance *qsd,
1291                               struct lquota_id_info *qi)
1292 {
1293         struct lquota_entry *lqe;
1294         struct qsd_qtype_info  *qqi;
1295         int rc = 0;
1296         bool is_free = qi->lqi_space < 0;
1297
1298         ENTRY;
1299
1300         if (unlikely(qsd == NULL))
1301                 RETURN(0);
1302
1303         if (qsd->qsd_dev->dd_rdonly)
1304                 RETURN(0);
1305
1306         if (is_free)
1307                 qi->lqi_space *= -1;
1308
1309         /* We don't enforce quota until the qsd_instance is started */
1310         read_lock(&qsd->qsd_lock);
1311         if (!qsd->qsd_started) {
1312                 read_unlock(&qsd->qsd_lock);
1313                 RETURN(0);
1314         }
1315         read_unlock(&qsd->qsd_lock);
1316
1317         qqi = qsd->qsd_type_array[qi->lqi_type];
1318         LASSERT(qqi);
1319
1320         CDEBUG(D_QUOTA, "type %s, acct %s, free %d, count %llu\n",
1321                qsd_type_enabled(qsd, qi->lqi_type) ? "enabled" : "disabled",
1322                (qsd->qsd_type_array[qi->lqi_type])->qqi_acct_failed ? "failed" :
1323                "succeed", is_free, qi->lqi_space);
1324
1325         /* ignore quota enforcement request when:
1326          *    - quota isn't enforced for this quota type
1327          * or - the user/group is root
1328          * or - quota accounting isn't enabled
1329          */
1330         if (!qsd_type_enabled(qsd, qi->lqi_type) || qi->lqi_id.qid_uid == 0 ||
1331             (qsd->qsd_type_array[qi->lqi_type])->qqi_acct_failed)
1332                 RETURN(0);
1333
1334         if (is_free) {
1335                 /* look up lquota entry associated with qid */
1336                 lqe = lqe_locate(env, qqi->qqi_site, &qi->lqi_id);
1337                 if (IS_ERR(lqe))
1338                         RETURN(PTR_ERR(lqe));
1339                 if (!lqe->lqe_enforced) {
1340                         lqe_putref(lqe);
1341                         RETURN(0);
1342                 }
1343
1344                 qi->lqi_qentry = lqe;
1345
1346                 /* lqe will be put in qsd_op_end0 */
1347                 qsd_op_end0(env, qsd->qsd_type_array[qi->lqi_type], qi);
1348                 qi->lqi_qentry = NULL;
1349         } else {
1350                 /* manage quota enforcement for this ID */
1351                 rc = qsd_op_begin0(env, qsd->qsd_type_array[qi->lqi_type], qi,
1352                                    qi->lqi_space, NULL);
1353
1354                 if (qi->lqi_qentry != NULL) {
1355                         lqe_putref(qi->lqi_qentry);
1356                         qi->lqi_qentry = NULL;
1357                 }
1358         }
1359
1360         CDEBUG(D_QUOTA, "%s quota: type %i, uid %llu, gid %llu, space %llu\n",
1361                is_free ? "Free" : "Reserve", qi->lqi_type, qi->lqi_id.qid_uid,
1362                qi->lqi_id.qid_gid, qi->lqi_space);
1363
1364         RETURN(rc);
1365 }
1366 EXPORT_SYMBOL(qsd_reserve_or_free_quota);