Whamcloud - gitweb
LU-7816 quota: add default quota setting support
[fs/lustre-release.git] / lustre / quota / qmt_entry.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 021110-1307, USA
20  *
21  * GPL HEADER END
22  */
23 /*
24  * Copyright (c) 2012, 2016, Intel Corporation.
25  * Use is subject to license terms.
26  *
27  * Author: Johann Lombardi <johann.lombardi@intel.com>
28  * Author: Niu    Yawei    <yawei.niu@intel.com>
29  */
30
31 #define DEBUG_SUBSYSTEM S_LQUOTA
32
33 #include "qmt_internal.h"
34
35 /*
36  * Initialize qmt-specific fields of quota entry.
37  *
38  * \param lqe - is the quota entry to initialize
39  * \param arg - is the pointer to the qmt_pool_info structure
40  */
41 static void qmt_lqe_init(struct lquota_entry *lqe, void *arg)
42 {
43         LASSERT(lqe_is_master(lqe));
44
45         lqe->lqe_revoke_time = 0;
46         init_rwsem(&lqe->lqe_sem);
47 }
48
49 /* Apply the default quota setting to the specified quota entry
50  *
51  * \param env           - is the environment passed by the caller
52  * \param pool          - is the quota pool of the quota entry
53  * \param lqe           - is the lquota_entry object to apply default quota on
54  * \param create_record - if true, an global quota record will be created and
55  *                        write to the disk.
56  *
57  * \retval 0            : success
58  * \retval -ve          : other appropriate errors
59  */
60 int qmt_lqe_set_default(const struct lu_env *env, struct qmt_pool_info *pool,
61                         struct lquota_entry *lqe, bool create_record)
62 {
63         struct lquota_entry     *lqe_def;
64         int                     rc = 0;
65
66         ENTRY;
67
68         if (lqe->lqe_id.qid_uid == 0)
69                 RETURN(0);
70
71         lqe_def = pool->qpi_grace_lqe[lqe->lqe_site->lqs_qtype];
72
73         LQUOTA_DEBUG(lqe, "inherit default quota");
74
75         lqe->lqe_is_default = true;
76         lqe->lqe_hardlimit = lqe_def->lqe_hardlimit;
77         lqe->lqe_softlimit = lqe_def->lqe_softlimit;
78
79         if (create_record) {
80                 lqe->lqe_uptodate = true;
81                 rc = qmt_set_with_lqe(env, pool->qpi_qmt, lqe, 0, 0,
82                                       LQUOTA_GRACE_FLAG(0, LQUOTA_FLAG_DEFAULT),
83                                       QIF_TIMES, true, false);
84
85                 if (rc != 0)
86                         LQUOTA_ERROR(lqe, "failed to create the global quota"
87                                      " record: %d", rc);
88         }
89
90         if (lqe->lqe_hardlimit == 0 && lqe->lqe_softlimit == 0)
91                 lqe->lqe_enforced = false;
92         else
93                 lqe->lqe_enforced = true;
94
95         RETURN(rc);
96 }
97
98 /*
99  * Update a lquota entry. This is done by reading quota settings from the global
100  * index. The lquota entry must be write locked.
101  *
102  * \param env - the environment passed by the caller
103  * \param lqe - is the quota entry to refresh
104  * \param arg - is the pointer to the qmt_pool_info structure
105  */
106 static int qmt_lqe_read(const struct lu_env *env, struct lquota_entry *lqe,
107                         void *arg)
108 {
109         struct qmt_thread_info  *qti = qmt_info(env);
110         struct qmt_pool_info    *pool = (struct qmt_pool_info *)arg;
111         int                      rc;
112         ENTRY;
113
114         LASSERT(lqe_is_master(lqe));
115
116         /* read record from disk */
117         rc = lquota_disk_read(env, pool->qpi_glb_obj[lqe->lqe_site->lqs_qtype],
118                               &lqe->lqe_id, (struct dt_rec *)&qti->qti_glb_rec);
119
120         switch (rc) {
121         case -ENOENT:
122                 qmt_lqe_set_default(env, pool, lqe, true);
123                 break;
124         case 0:
125                 /* copy quota settings from on-disk record */
126                 lqe->lqe_granted   = qti->qti_glb_rec.qbr_granted;
127                 lqe->lqe_hardlimit = qti->qti_glb_rec.qbr_hardlimit;
128                 lqe->lqe_softlimit = qti->qti_glb_rec.qbr_softlimit;
129                 lqe->lqe_gracetime = LQUOTA_GRACE(qti->qti_glb_rec.qbr_time);
130
131                 if (lqe->lqe_hardlimit == 0 && lqe->lqe_softlimit == 0 &&
132                     (LQUOTA_FLAG(qti->qti_glb_rec.qbr_time) &
133                      LQUOTA_FLAG_DEFAULT))
134                         qmt_lqe_set_default(env, pool, lqe, false);
135                 break;
136         default:
137                 LQUOTA_ERROR(lqe, "failed to read quota entry from disk, rc:%d",
138                              rc);
139                 RETURN(rc);
140         }
141
142         if (lqe->lqe_id.qid_uid == 0 ||
143             (lqe->lqe_hardlimit == 0 && lqe->lqe_softlimit == 0))
144                 /* {hard,soft}limit=0 means no quota enforced */
145                 lqe->lqe_enforced = false;
146         else
147                 lqe->lqe_enforced  = true;
148
149         LQUOTA_DEBUG(lqe, "read");
150         RETURN(0);
151 }
152
153 /*
154  * Print lqe information for debugging.
155  *
156  * \param lqe - is the quota entry to debug
157  * \param arg - is the pointer to the qmt_pool_info structure
158  * \param msgdata - debug message
159  * \param fmt     - format of debug message
160  */
161 static void qmt_lqe_debug(struct lquota_entry *lqe, void *arg,
162                           struct libcfs_debug_msg_data *msgdata,
163                           const char *fmt, va_list args)
164 {
165         struct qmt_pool_info    *pool = (struct qmt_pool_info *)arg;
166
167         libcfs_debug_vmsg2(msgdata, fmt, args,
168                            "qmt:%s pool:%d-%s id:%llu enforced:%d hard:%llu"
169                            " soft:%llu granted:%llu time:%llu qunit: %llu"
170                            " edquot:%d may_rel:%llu revoke:%lld default:%s\n",
171                            pool->qpi_qmt->qmt_svname,
172                            pool->qpi_key & 0x0000ffff,
173                            RES_NAME(pool->qpi_key >> 16),
174                            lqe->lqe_id.qid_uid, lqe->lqe_enforced,
175                            lqe->lqe_hardlimit, lqe->lqe_softlimit,
176                            lqe->lqe_granted, lqe->lqe_gracetime,
177                            lqe->lqe_qunit, lqe->lqe_edquot, lqe->lqe_may_rel,
178                            lqe->lqe_revoke_time,
179                            lqe->lqe_is_default ? "yes" : "no");
180 }
181
182 /*
183  * Vector of quota entry operations supported on the master
184  */
185 struct lquota_entry_operations qmt_lqe_ops = {
186         .lqe_init       = qmt_lqe_init,
187         .lqe_read       = qmt_lqe_read,
188         .lqe_debug      = qmt_lqe_debug,
189 };
190
191 /*
192  * Reserve enough credits to update records in both the global index and
193  * the slave index identified by \slv_obj
194  *
195  * \param env     - is the environment passed by the caller
196  * \param lqe     - is the quota entry associated with the identifier
197  *                  subject to the change
198  * \param slv_obj - is the dt_object associated with the index file
199  * \param restore - is a temporary storage for current quota settings which will
200  *                  be restored if something goes wrong at index update time.
201  */
202 struct thandle *qmt_trans_start_with_slv(const struct lu_env *env,
203                                          struct lquota_entry *lqe,
204                                          struct dt_object *slv_obj,
205                                          struct qmt_lqe_restore *restore)
206 {
207         struct qmt_device       *qmt;
208         struct thandle          *th;
209         int                      rc;
210         ENTRY;
211
212         LASSERT(lqe != NULL);
213         LASSERT(lqe_is_master(lqe));
214
215         qmt = lqe2qpi(lqe)->qpi_qmt;
216
217         if (slv_obj != NULL)
218                 LQUOTA_DEBUG(lqe, "declare write for slv "DFID,
219                              PFID(lu_object_fid(&slv_obj->do_lu)));
220
221         /* start transaction */
222         th = dt_trans_create(env, qmt->qmt_child);
223         if (IS_ERR(th))
224                 RETURN(th);
225
226         if (slv_obj == NULL)
227                 /* quota settings on master are updated synchronously for the
228                  * time being */
229                 th->th_sync = 1;
230
231         /* reserve credits for global index update */
232         rc = lquota_disk_declare_write(env, th, LQE_GLB_OBJ(lqe), &lqe->lqe_id);
233         if (rc)
234                 GOTO(out, rc);
235
236         if (slv_obj != NULL) {
237                 /* reserve credits for slave index update */
238                 rc = lquota_disk_declare_write(env, th, slv_obj, &lqe->lqe_id);
239                 if (rc)
240                         GOTO(out, rc);
241         }
242
243         /* start transaction */
244         rc = dt_trans_start_local(env, qmt->qmt_child, th);
245         if (rc)
246                 GOTO(out, rc);
247
248         EXIT;
249 out:
250         if (rc) {
251                 dt_trans_stop(env, qmt->qmt_child, th);
252                 th = ERR_PTR(rc);
253                 LQUOTA_ERROR(lqe, "failed to slv declare write for "DFID
254                              ", rc:%d", PFID(lu_object_fid(&slv_obj->do_lu)),
255                              rc);
256         } else {
257                 restore->qlr_hardlimit = lqe->lqe_hardlimit;
258                 restore->qlr_softlimit = lqe->lqe_softlimit;
259                 restore->qlr_gracetime = lqe->lqe_gracetime;
260                 restore->qlr_granted   = lqe->lqe_granted;
261                 restore->qlr_qunit     = lqe->lqe_qunit;
262         }
263         return th;
264 }
265
266 /*
267  * Reserve enough credits to update a record in the global index
268  *
269  * \param env     - is the environment passed by the caller
270  * \param lqe     - is the quota entry to be modified in the global index
271  * \param restore - is a temporary storage for current quota settings which will
272  *                  be restored if something goes wrong at index update time.
273  */
274 struct thandle *qmt_trans_start(const struct lu_env *env,
275                                 struct lquota_entry *lqe,
276                                 struct qmt_lqe_restore *restore)
277 {
278         LQUOTA_DEBUG(lqe, "declare write");
279         return qmt_trans_start_with_slv(env, lqe, NULL, restore);
280 }
281
282 /*
283  * Update record associated with a quota entry in the global index.
284  * If LQUOTA_BUMP_VER is set, then the global index version must also be
285  * bumped.
286  * The entry must be at least read locked, dirty and up-to-date.
287  *
288  * \param env   - the environment passed by the caller
289  * \param th    - is the transaction handle to be used for the disk writes
290  * \param lqe   - is the quota entry to udpate
291  * \param obj   - is the dt_object associated with the index file
292  * \param flags - can be LQUOTA_BUMP_VER or LQUOTA_SET_VER.
293  * \param ver   - is used to return the new version of the index.
294  *
295  * \retval      - 0 on success and lqe dirty flag cleared,
296  *                appropriate error on failure and uptodate flag cleared.
297  */
298 int qmt_glb_write(const struct lu_env *env, struct thandle *th,
299                   struct lquota_entry *lqe, __u32 flags, __u64 *ver)
300 {
301         struct qmt_thread_info  *qti = qmt_info(env);
302         struct lquota_glb_rec   *rec;
303         int                      rc;
304         ENTRY;
305
306         LASSERT(lqe != NULL);
307         LASSERT(lqe_is_master(lqe));
308         LASSERT(lqe_is_locked(lqe));
309         LASSERT(lqe->lqe_uptodate);
310         LASSERT((flags & ~(LQUOTA_BUMP_VER | LQUOTA_SET_VER)) == 0);
311
312         LQUOTA_DEBUG(lqe, "write glb");
313
314         /* never delete the entry even when the id isn't enforced and
315          * no any guota granted, otherwise, this entry will not be
316          * synced to slave during the reintegration. */
317         rec = &qti->qti_glb_rec;
318
319         /* fill global index with updated quota settings */
320         rec->qbr_granted   = lqe->lqe_granted;
321         if (lqe->lqe_is_default) {
322                 rec->qbr_hardlimit = 0;
323                 rec->qbr_softlimit = 0;
324                 rec->qbr_time      = LQUOTA_GRACE_FLAG(0, LQUOTA_FLAG_DEFAULT);
325         } else {
326                 rec->qbr_hardlimit = lqe->lqe_hardlimit;
327                 rec->qbr_softlimit = lqe->lqe_softlimit;
328                 rec->qbr_time      = lqe->lqe_gracetime;
329         }
330
331         /* write new quota settings */
332         rc = lquota_disk_write(env, th, LQE_GLB_OBJ(lqe), &lqe->lqe_id,
333                                (struct dt_rec *)rec, flags, ver);
334         if (rc)
335                 /* we failed to write the new quota settings to disk, report
336                  * error to caller who will restore the initial value */
337                 LQUOTA_ERROR(lqe, "failed to update global index, rc:%d", rc);
338
339         RETURN(rc);
340 }
341
342 /*
343  * Read from disk how much quota space is allocated to a slave.
344  * This is done by reading records from the dedicated slave index file.
345  * Return in \granted how much quota space is currently allocated to the
346  * slave.
347  * The entry must be at least read locked.
348  *
349  * \param env - the environment passed by the caller
350  * \param lqe - is the quota entry associated with the identifier to look-up
351  *              in the slave index
352  * \param slv_obj - is the dt_object associated with the slave index
353  * \param granted - is the output parameter where to return how much space
354  *                  is granted to the slave.
355  *
356  * \retval    - 0 on success, appropriate error on failure
357  */
358 int qmt_slv_read(const struct lu_env *env, struct lquota_entry *lqe,
359                  struct dt_object *slv_obj, __u64 *granted)
360 {
361         struct qmt_thread_info  *qti = qmt_info(env);
362         struct lquota_slv_rec   *slv_rec = &qti->qti_slv_rec;
363         int                      rc;
364         ENTRY;
365
366         LASSERT(lqe != NULL);
367         LASSERT(lqe_is_master(lqe));
368         LASSERT(lqe_is_locked(lqe));
369
370         LQUOTA_DEBUG(lqe, "read slv "DFID,
371                      PFID(lu_object_fid(&slv_obj->do_lu)));
372
373         /* read slave record from disk */
374         rc = lquota_disk_read(env, slv_obj, &lqe->lqe_id,
375                               (struct dt_rec *)slv_rec);
376         switch (rc) {
377         case -ENOENT:
378                 *granted = 0;
379                 break;
380         case 0:
381                 /* extract granted from on-disk record */
382                 *granted = slv_rec->qsr_granted;
383                 break;
384         default:
385                 LQUOTA_ERROR(lqe, "failed to read slave record "DFID,
386                              PFID(lu_object_fid(&slv_obj->do_lu)));
387                 RETURN(rc);
388         }
389
390         LQUOTA_DEBUG(lqe, "successful slv read %llu", *granted);
391
392         RETURN(0);
393 }
394
395 /*
396  * Update record in slave index file.
397  * The entry must be at least read locked.
398  *
399  * \param env - the environment passed by the caller
400  * \param th  - is the transaction handle to be used for the disk writes
401  * \param lqe - is the dirty quota entry which will be updated at the same time
402  *              as the slave index
403  * \param slv_obj - is the dt_object associated with the slave index
404  * \param flags - can be LQUOTA_BUMP_VER or LQUOTA_SET_VER.
405  * \param ver   - is used to return the new version of the index.
406  * \param granted - is the new amount of quota space owned by the slave
407  *
408  * \retval    - 0 on success, appropriate error on failure
409  */
410 int qmt_slv_write(const struct lu_env *env, struct thandle *th,
411                   struct lquota_entry *lqe, struct dt_object *slv_obj,
412                   __u32 flags, __u64 *ver, __u64 granted)
413 {
414         struct qmt_thread_info  *qti = qmt_info(env);
415         struct lquota_slv_rec   *rec;
416         int                      rc;
417         ENTRY;
418
419         LASSERT(lqe != NULL);
420         LASSERT(lqe_is_master(lqe));
421         LASSERT(lqe_is_locked(lqe));
422
423         LQUOTA_DEBUG(lqe, "write slv "DFID" granted:%llu",
424                      PFID(lu_object_fid(&slv_obj->do_lu)), granted);
425
426         /* never delete the entry, otherwise, it'll not be transferred
427          * to slave during reintegration. */
428         rec = &qti->qti_slv_rec;
429
430         /* updated space granted to this slave */
431         rec->qsr_granted = granted;
432
433         /* write new granted space */
434         rc = lquota_disk_write(env, th, slv_obj, &lqe->lqe_id,
435                                (struct dt_rec *)rec, flags, ver);
436         if (rc) {
437                 LQUOTA_ERROR(lqe, "failed to update slave index "DFID" granted:"
438                              "%llu", PFID(lu_object_fid(&slv_obj->do_lu)),
439                              granted);
440                 RETURN(rc);
441         }
442
443         RETURN(0);
444 }
445
446 /*
447  * Check whether new limits are valid for this pool
448  *
449  * \param lqe  - is the quota entry subject to the setquota
450  * \param hard - is the new hard limit
451  * \param soft - is the new soft limit
452  */
453 int qmt_validate_limits(struct lquota_entry *lqe, __u64 hard, __u64 soft)
454 {
455         ENTRY;
456
457         if (hard != 0 && soft > hard)
458                 /* soft limit must be less than hard limit */
459                 RETURN(-EINVAL);
460         RETURN(0);
461 }
462
463 /*
464  * Set/clear edquot flag after quota space allocation/release or settings
465  * change. Slaves will be notified of changes via glimpse on per-ID lock
466  *
467  * \param lqe - is the quota entry to check
468  * \param now - is the current time in second used for grace time managment
469  */
470 void qmt_adjust_edquot(struct lquota_entry *lqe, __u64 now)
471 {
472         struct qmt_pool_info    *pool = lqe2qpi(lqe);
473         ENTRY;
474
475         if (!lqe->lqe_enforced || lqe->lqe_id.qid_uid == 0)
476                 RETURN_EXIT;
477
478         if (!lqe->lqe_edquot) {
479                 /* space exhausted flag not set, let's check whether it is time
480                  * to set the flag */
481
482                 if (!qmt_space_exhausted(lqe, now))
483                         /* the qmt still has available space */
484                         RETURN_EXIT;
485
486                 /* See comment in qmt_adjust_qunit(). LU-4139 */
487                 if (qmt_hard_exhausted(lqe) ||
488                     pool->qpi_key >> 16 != LQUOTA_RES_DT) {
489                         time64_t lapse;
490
491                         /* we haven't reached the minimal qunit yet so there is
492                          * still hope that the rebalancing process might free
493                          * up some quota space */
494                         if (lqe->lqe_qunit != pool->qpi_least_qunit)
495                                 RETURN_EXIT;
496
497                         /* least qunit value not sent to all slaves yet */
498                         if (lqe->lqe_revoke_time == 0)
499                                 RETURN_EXIT;
500
501                         /* Let's give more time to slave to release space */
502                         lapse = ktime_get_seconds() - QMT_REBA_TIMEOUT;
503                         if (lqe->lqe_may_rel != 0 && lqe->lqe_revoke_time > lapse)
504                                 RETURN_EXIT;
505                 } else {
506                         if (lqe->lqe_qunit > pool->qpi_soft_least_qunit)
507                                 RETURN_EXIT;
508                 }
509
510                 /* set edquot flag */
511                 lqe->lqe_edquot = true;
512         } else {
513                 /* space exhausted flag set, let's check whether it is time to
514                  * clear it */
515
516                 if (qmt_space_exhausted(lqe, now))
517                         /* the qmt still has not space */
518                         RETURN_EXIT;
519
520                 if (lqe->lqe_hardlimit != 0 &&
521                     lqe->lqe_granted + pool->qpi_least_qunit >
522                                                         lqe->lqe_hardlimit)
523                         /* we clear the flag only once at least one least qunit
524                          * is available */
525                         RETURN_EXIT;
526
527                 /* clear edquot flag */
528                 lqe->lqe_edquot = false;
529         }
530
531         LQUOTA_DEBUG(lqe, "changing edquot flag");
532
533         /* let's notify slave by issuing glimpse on per-ID lock.
534          * the rebalance thread will take care of this */
535         qmt_id_lock_notify(pool->qpi_qmt, lqe);
536         EXIT;
537 }
538
539 /* Using least_qunit when over block softlimit will seriously impact the
540  * write performance, we need to do some special tweaking on that. */
541 static __u64 qmt_calc_softlimit(struct lquota_entry *lqe, bool *oversoft)
542 {
543         struct qmt_pool_info *pool = lqe2qpi(lqe);
544
545         LASSERT(lqe->lqe_softlimit != 0);
546         *oversoft = false;
547         /* No need to do special tweaking for inode limit */
548         if (pool->qpi_key >> 16 != LQUOTA_RES_DT)
549                 return lqe->lqe_softlimit;
550
551         if (lqe->lqe_granted <= lqe->lqe_softlimit +
552                                 pool->qpi_soft_least_qunit) {
553                 return lqe->lqe_softlimit;
554         } else if (lqe->lqe_hardlimit != 0) {
555                 *oversoft = true;
556                 return lqe->lqe_hardlimit;
557         } else {
558                 *oversoft = true;
559                 return 0;
560         }
561 }
562
563 /*
564  * Try to grant more quota space back to slave.
565  *
566  * \param lqe     - is the quota entry for which we would like to allocate more
567  *                  space
568  * \param granted - is how much was already granted as part of the request
569  *                  processing
570  * \param spare   - is how much unused quota space the slave already owns
571  *
572  * \retval return how additional space can be granted to the slave
573  */
574 __u64 qmt_alloc_expand(struct lquota_entry *lqe, __u64 granted, __u64 spare)
575 {
576         struct qmt_pool_info    *pool = lqe2qpi(lqe);
577         __u64                    remaining, qunit;
578         int                      slv_cnt;
579
580         LASSERT(lqe->lqe_enforced && lqe->lqe_qunit != 0);
581
582         slv_cnt = lqe2qpi(lqe)->qpi_slv_nr[lqe->lqe_site->lqs_qtype];
583         qunit   = lqe->lqe_qunit;
584
585         /* See comment in qmt_adjust_qunit(). LU-4139. */
586         if (lqe->lqe_softlimit != 0) {
587                 bool oversoft;
588                 remaining = qmt_calc_softlimit(lqe, &oversoft);
589                 if (remaining == 0)
590                         remaining = lqe->lqe_granted +
591                                     pool->qpi_soft_least_qunit;
592         } else {
593                 remaining = lqe->lqe_hardlimit;
594         }
595
596         if (lqe->lqe_granted >= remaining)
597                 RETURN(0);
598
599         remaining -= lqe->lqe_granted;
600
601         do {
602                 if (spare >= qunit)
603                         break;
604
605                 granted &= (qunit - 1);
606
607                 if (remaining > (slv_cnt * qunit) >> 1) {
608                         /* enough room to grant more space w/o additional
609                          * shrinking ... at least for now */
610                         remaining -= (slv_cnt * qunit) >> 1;
611                 } else if (qunit != pool->qpi_least_qunit) {
612                         qunit >>= 2;
613                         continue;
614                 }
615
616                 granted &= (qunit - 1);
617                 if (spare > 0)
618                         RETURN(min_t(__u64, qunit - spare, remaining));
619                 else
620                         RETURN(min_t(__u64, qunit - granted, remaining));
621         } while (qunit >= pool->qpi_least_qunit);
622
623         RETURN(0);
624 }
625
626 /*
627  * Adjust qunit size according to quota limits and total granted count.
628  * The caller must have locked the lqe.
629  *
630  * \param env - the environment passed by the caller
631  * \param lqe - is the qid entry to be adjusted
632  */
633 void qmt_adjust_qunit(const struct lu_env *env, struct lquota_entry *lqe)
634 {
635         struct qmt_pool_info    *pool = lqe2qpi(lqe);
636         int                      slv_cnt;
637         __u64                    qunit, limit, qunit2 = 0;
638         ENTRY;
639
640         LASSERT(lqe_is_locked(lqe));
641
642         if (!lqe->lqe_enforced || lqe->lqe_id.qid_uid == 0)
643                 /* no quota limits */
644                 RETURN_EXIT;
645
646         /* record how many slaves have already registered */
647         slv_cnt = pool->qpi_slv_nr[lqe->lqe_site->lqs_qtype];
648         if (slv_cnt == 0)
649                 /* wait for at least one slave to join */
650                 RETURN_EXIT;
651
652         /* Qunit calculation is based on soft limit, if any, hard limit
653          * otherwise. This means that qunit is shrunk to the minimum when
654          * beyond the soft limit. This will impact performance, but that's the
655          * price of an accurate grace time management. */
656         if (lqe->lqe_softlimit != 0) {
657                 bool oversoft;
658                 /* As a compromise of write performance and the grace time
659                  * accuracy, the block qunit size will be shrunk to
660                  * qpi_soft_least_qunit when over softlimit. LU-4139. */
661                 limit = qmt_calc_softlimit(lqe, &oversoft);
662                 if (oversoft)
663                         qunit2 = pool->qpi_soft_least_qunit;
664                 if (limit == 0)
665                         GOTO(done, qunit = qunit2);
666         } else if (lqe->lqe_hardlimit != 0) {
667                 limit = lqe->lqe_hardlimit;
668         } else {
669                 LQUOTA_ERROR(lqe, "enforced bit set, but neither hard nor soft "
670                              "limit are set");
671                 RETURN_EXIT;
672         }
673
674         qunit = lqe->lqe_qunit == 0 ? pool->qpi_least_qunit : lqe->lqe_qunit;
675
676         /* The qunit value is computed as follows: limit / (2 * slv_cnt).
677          * Then 75% of the quota space can be granted with current qunit value.
678          * The remaining 25% are then used with reduced qunit size (by a factor
679          * of 4) which is then divided in a similar manner.
680          *
681          * |---------------------limit---------------------|
682          * |-------limit / 2-------|-limit / 4-|-limit / 4-|
683          * |qunit|qunit|qunit|qunit|           |           |
684          * |----slv_cnt * qunit----|           |           |
685          * |-grow limit-|          |           |           |
686          * |--------------shrink limit---------|           |
687          * |---space granted in qunit chunks---|-remaining-|
688          *                                    /             \
689          *                                   /               \
690          *                                  /                 \
691          *                                 /                   \
692          *                                /                     \
693          *     qunit >>= 2;            |qunit*slv_cnt|qunit*slv_cnt|
694          *                             |---space in qunit---|remain|
695          *                                  ...                               */
696         if (qunit == pool->qpi_least_qunit ||
697             limit >= lqe->lqe_granted + ((slv_cnt * qunit) >> 1)) {
698                 /* current qunit value still fits, let's see if we can afford to
699                  * increase qunit now ...
700                  * To increase qunit again, we have to be under 25% */
701                 while (qunit && limit >= lqe->lqe_granted + 6 * qunit * slv_cnt)
702                         qunit <<= 2;
703
704                 if (!qunit) {
705                         qunit = limit;
706                         do_div(qunit, 2 * slv_cnt);
707                 }
708
709         } else {
710                 /* shrink qunit until we find a suitable value */
711                 while (qunit > pool->qpi_least_qunit &&
712                        limit < lqe->lqe_granted + ((slv_cnt * qunit) >> 1))
713                         qunit >>= 2;
714         }
715
716         if (qunit2 && qunit > qunit2)
717                 qunit = qunit2;
718 done:
719         if (lqe->lqe_qunit == qunit)
720                 /* keep current qunit */
721                 RETURN_EXIT;
722
723         LQUOTA_DEBUG(lqe, "%s qunit to %llu",
724                      lqe->lqe_qunit < qunit ? "increasing" : "decreasing",
725                      qunit);
726
727         /* store new qunit value */
728         swap(lqe->lqe_qunit, qunit);
729
730         /* reset revoke time */
731         lqe->lqe_revoke_time = 0;
732
733         if (lqe->lqe_qunit < qunit)
734                 /* let's notify slave of qunit shrinking */
735                 qmt_id_lock_notify(pool->qpi_qmt, lqe);
736         else if (lqe->lqe_qunit == pool->qpi_least_qunit)
737                 /* initial qunit value is the smallest one */
738                 lqe->lqe_revoke_time = ktime_get_seconds();
739         EXIT;
740 }
741
742 /*
743  * Adjust qunit & edquot flag in case it wasn't initialized already (e.g.
744  * limit set while no slaves were connected yet)
745  */
746 void qmt_revalidate(const struct lu_env *env, struct lquota_entry *lqe)
747 {
748         if (lqe->lqe_qunit == 0) {
749                 /* lqe was read from disk, but neither qunit, nor edquot flag
750                  * were initialized */
751                 qmt_adjust_qunit(env, lqe);
752                 if (lqe->lqe_qunit != 0)
753                         qmt_adjust_edquot(lqe, ktime_get_real_seconds());
754         }
755 }