Whamcloud - gitweb
LU-16339 quota: notify OSTs until lge_qunit_nu is set
[fs/lustre-release.git] / lustre / quota / qmt_entry.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 021110-1307, USA
20  *
21  * GPL HEADER END
22  */
23 /*
24  * Copyright (c) 2012, 2016, Intel Corporation.
25  * Use is subject to license terms.
26  *
27  * Author: Johann Lombardi <johann.lombardi@intel.com>
28  * Author: Niu    Yawei    <yawei.niu@intel.com>
29  */
30
31 #define DEBUG_SUBSYSTEM S_LQUOTA
32
33 #include "qmt_internal.h"
34
35 /*
36  * Initialize qmt-specific fields of quota entry.
37  *
38  * \param lqe - is the quota entry to initialize
39  * \param arg - is the pointer to the qmt_pool_info structure
40  */
41 static void qmt_lqe_init(struct lquota_entry *lqe, void *arg)
42 {
43         LASSERT(lqe_is_master(lqe));
44
45         lqe->lqe_revoke_time = 0;
46         init_rwsem(&lqe->lqe_sem);
47         mutex_init(&lqe->lqe_glbl_data_lock);
48 }
49
50 /* Apply the default quota setting to the specified quota entry
51  *
52  * \param env           - is the environment passed by the caller
53  * \param pool          - is the quota pool of the quota entry
54  * \param lqe           - is the lquota_entry object to apply default quota on
55  * \param create_record - if true, an global quota record will be created and
56  *                        write to the disk.
57  *
58  * \retval 0            : success
59  * \retval -ve          : other appropriate errors
60  */
61 int qmt_lqe_set_default(const struct lu_env *env, struct qmt_pool_info *pool,
62                         struct lquota_entry *lqe, bool create_record)
63 {
64         struct lquota_entry     *lqe_def;
65         int                     rc = 0;
66
67         ENTRY;
68
69         if (lqe->lqe_id.qid_uid == 0)
70                 RETURN(0);
71
72         lqe_def = pool->qpi_grace_lqe[lqe_qtype(lqe)];
73
74         LQUOTA_DEBUG(lqe, "inherit default quota");
75
76         lqe->lqe_is_default = true;
77         lqe->lqe_hardlimit = lqe_def->lqe_hardlimit;
78         lqe->lqe_softlimit = lqe_def->lqe_softlimit;
79
80         if (create_record) {
81                 lqe->lqe_uptodate = true;
82                 rc = qmt_set_with_lqe(env, pool->qpi_qmt, lqe, 0, 0,
83                                       LQUOTA_GRACE_FLAG(0, LQUOTA_FLAG_DEFAULT),
84                                       QIF_TIMES, true, false);
85
86                 if (rc != 0)
87                         LQUOTA_ERROR(lqe, "failed to create the global quota"
88                                      " record: %d", rc);
89         }
90
91         if (lqe->lqe_hardlimit == 0 && lqe->lqe_softlimit == 0)
92                 lqe->lqe_enforced = false;
93         else
94                 lqe->lqe_enforced = true;
95
96         RETURN(rc);
97 }
98
99 /*
100  * Update a lquota entry. This is done by reading quota settings from the global
101  * index. The lquota entry must be write locked.
102  *
103  * \param env - the environment passed by the caller
104  * \param lqe - is the quota entry to refresh
105  * \param arg - is the pointer to the qmt_pool_info structure
106  * \param find - don't create lqe on disk in case of ENOENT if true
107  */
108 static int qmt_lqe_read(const struct lu_env *env, struct lquota_entry *lqe,
109                         void *arg, bool find)
110 {
111         struct qmt_thread_info  *qti = qmt_info(env);
112         struct qmt_pool_info    *pool = (struct qmt_pool_info *)arg;
113         int                      rc;
114         ENTRY;
115
116         LASSERT(lqe_is_master(lqe));
117
118         /* read record from disk */
119         rc = lquota_disk_read(env, pool->qpi_glb_obj[lqe->lqe_site->lqs_qtype],
120                               &lqe->lqe_id, (struct dt_rec *)&qti->qti_glb_rec);
121
122         switch (rc) {
123         case -ENOENT:
124                 if (find)
125                         RETURN(-ENOENT);
126                 qmt_lqe_set_default(env, pool, lqe, true);
127                 break;
128         case 0:
129                 /* copy quota settings from on-disk record */
130                 lqe->lqe_granted   = qti->qti_glb_rec.qbr_granted;
131                 lqe->lqe_hardlimit = qti->qti_glb_rec.qbr_hardlimit;
132                 lqe->lqe_softlimit = qti->qti_glb_rec.qbr_softlimit;
133                 lqe->lqe_gracetime = LQUOTA_GRACE(qti->qti_glb_rec.qbr_time);
134
135                 if (lqe->lqe_hardlimit == 0 && lqe->lqe_softlimit == 0 &&
136                     (LQUOTA_FLAG(qti->qti_glb_rec.qbr_time) &
137                      LQUOTA_FLAG_DEFAULT))
138                         qmt_lqe_set_default(env, pool, lqe, false);
139                 else if (LQUOTA_FLAG(qti->qti_glb_rec.qbr_time) &
140                                                         LQUOTA_FLAG_RESET)
141                         lqe->lqe_is_reset = true;
142                 break;
143         default:
144                 LQUOTA_ERROR(lqe, "failed to read quota entry from disk, rc:%d",
145                              rc);
146                 RETURN(rc);
147         }
148
149         if (lqe->lqe_id.qid_uid == 0 ||
150             (lqe->lqe_hardlimit == 0 && lqe->lqe_softlimit == 0))
151                 /* {hard,soft}limit=0 means no quota enforced */
152                 lqe->lqe_enforced = false;
153         else
154                 lqe->lqe_enforced  = true;
155
156         if (qmt_pool_global(pool))
157                 lqe->lqe_is_global = 1;
158
159         LQUOTA_DEBUG(lqe, "read");
160         RETURN(0);
161 }
162
163 /*
164  * Print lqe information for debugging.
165  *
166  * \param lqe - is the quota entry to debug
167  * \param arg - is the pointer to the qmt_pool_info structure
168  * \param msgdata - debug message
169  * \param fmt     - format of debug message
170  */
171 static void qmt_lqe_debug(struct lquota_entry *lqe, void *arg,
172                           struct libcfs_debug_msg_data *msgdata,
173                           struct va_format *vaf)
174 {
175         struct qmt_pool_info    *pool = (struct qmt_pool_info *)arg;
176
177         libcfs_debug_msg(msgdata,
178                          "%pV qmt:%s pool:%s-%s id:%llu enforced:%d hard:%llu soft:%llu granted:%llu time:%llu qunit: %llu edquot:%d may_rel:%llu revoke:%lld default:%s\n",
179                          vaf, pool->qpi_qmt->qmt_svname,
180                          RES_NAME(pool->qpi_rtype),
181                          pool->qpi_name,
182                          lqe->lqe_id.qid_uid, lqe->lqe_enforced,
183                          lqe->lqe_hardlimit, lqe->lqe_softlimit,
184                          lqe->lqe_granted, lqe->lqe_gracetime,
185                          lqe->lqe_qunit, lqe->lqe_edquot, lqe->lqe_may_rel,
186                          lqe->lqe_revoke_time,
187                          lqe->lqe_is_default ? "yes" : "no");
188 }
189
190 /*
191  * Vector of quota entry operations supported on the master
192  */
193 const struct lquota_entry_operations qmt_lqe_ops = {
194         .lqe_init       = qmt_lqe_init,
195         .lqe_read       = qmt_lqe_read,
196         .lqe_debug      = qmt_lqe_debug,
197 };
198
199 /*
200  * Reserve enough credits to update records in both the global index and
201  * the slave index identified by \slv_obj
202  *
203  * \param env     - is the environment passed by the caller
204  * \param lqe     - is the quota entry associated with the identifier
205  *                  subject to the change. If it is NULL lqes array is
206  *                  taken from env with qti_lqes_env(env).
207  * \param slv_obj - is the dt_object associated with the index file
208  * \param sync    - make transaction sync if true
209  */
210 struct thandle *qmt_trans_start_with_slv(const struct lu_env *env,
211                                          struct lquota_entry *lqe,
212                                          struct dt_object *slv_obj,
213                                          bool sync)
214 {
215         struct qmt_device       *qmt;
216         struct thandle          *th;
217         struct lquota_entry     **lqes;
218         struct qmt_lqe_restore  *restore;
219         int                      rc, i, lqes_cnt;
220         ENTRY;
221
222         restore = qti_lqes_rstr(env);
223         if (!lqe) {
224                 lqes_cnt = qti_lqes_cnt(env);
225                 lqes = qti_lqes(env);
226         } else {
227                 lqes_cnt = 1;
228                 lqes = &lqe;
229         }
230
231         /* qmt is the same for all lqes, so take it from the 1st */
232         qmt = lqe2qpi(lqes[0])->qpi_qmt;
233
234         if (slv_obj != NULL)
235                 LQUOTA_DEBUG(lqes[0], "declare write for slv "DFID,
236                              PFID(lu_object_fid(&slv_obj->do_lu)));
237
238         /* start transaction */
239         th = dt_trans_create(env, qmt->qmt_child);
240         if (IS_ERR(th))
241                 RETURN(th);
242
243         if (sync)
244                 /* quota settings on master are updated synchronously for the
245                  * time being */
246                 th->th_sync = 1;
247
248         /* reserve credits for global index update */
249         for (i = 0; i < lqes_cnt; i++) {
250                 rc = lquota_disk_declare_write(env, th,
251                                                LQE_GLB_OBJ(lqes[i]),
252                                                &lqes[i]->lqe_id);
253                 if (rc)
254                         GOTO(out, rc);
255         }
256
257         if (slv_obj != NULL) {
258                 /* reserve credits for slave index update */
259                 rc = lquota_disk_declare_write(env, th, slv_obj,
260                                                &lqes[0]->lqe_id);
261                 if (rc)
262                         GOTO(out, rc);
263         }
264
265         /* start transaction */
266         rc = dt_trans_start_local(env, qmt->qmt_child, th);
267         if (rc)
268                 GOTO(out, rc);
269
270         EXIT;
271 out:
272         if (rc) {
273                 dt_trans_stop(env, qmt->qmt_child, th);
274                 th = ERR_PTR(rc);
275                 LQUOTA_ERROR(lqes[0], "failed to slv declare write for "DFID
276                              ", rc:%d", PFID(lu_object_fid(&slv_obj->do_lu)),
277                              rc);
278         } else {
279                 for (i = 0; i < lqes_cnt; i++) {
280                         restore[i].qlr_hardlimit = lqes[i]->lqe_hardlimit;
281                         restore[i].qlr_softlimit = lqes[i]->lqe_softlimit;
282                         restore[i].qlr_gracetime = lqes[i]->lqe_gracetime;
283                         restore[i].qlr_granted   = lqes[i]->lqe_granted;
284                         restore[i].qlr_qunit     = lqes[i]->lqe_qunit;
285                 }
286         }
287         return th;
288 }
289
290 /*
291  * Reserve enough credits to update a record in the global index
292  *
293  * \param env     - is the environment passed by the caller
294  * \param lqe     - is the quota entry to be modified in the global index
295  * \param restore - is a temporary storage for current quota settings which will
296  *                  be restored if something goes wrong at index update time.
297  */
298 struct thandle *qmt_trans_start(const struct lu_env *env,
299                                 struct lquota_entry *lqe)
300 {
301         LQUOTA_DEBUG(lqe, "declare write");
302         return qmt_trans_start_with_slv(env, lqe, NULL, true);
303 }
304
305 int qmt_glb_write_lqes(const struct lu_env *env, struct thandle *th,
306                        __u32 flags, __u64 *ver)
307 {
308         int i, rc = 0;
309
310         for (i = 0; i < qti_lqes_cnt(env); i++) {
311                 rc = qmt_glb_write(env, th, qti_lqes(env)[i], flags, ver);
312                 if (rc)
313                         break;
314         }
315         return rc;
316 }
317
318 /*
319  * Update record associated with a quota entry in the global index.
320  * If LQUOTA_BUMP_VER is set, then the global index version must also be
321  * bumped.
322  * The entry must be at least read locked, dirty and up-to-date.
323  *
324  * \param env   - the environment passed by the caller
325  * \param th    - is the transaction handle to be used for the disk writes
326  * \param lqe   - is the quota entry to udpate
327  * \param obj   - is the dt_object associated with the index file
328  * \param flags - can be LQUOTA_BUMP_VER or LQUOTA_SET_VER.
329  * \param ver   - is used to return the new version of the index.
330  *
331  * \retval      - 0 on success and lqe dirty flag cleared,
332  *                appropriate error on failure and uptodate flag cleared.
333  */
334 int qmt_glb_write(const struct lu_env *env, struct thandle *th,
335                   struct lquota_entry *lqe, __u32 flags, __u64 *ver)
336 {
337         struct qmt_thread_info  *qti = qmt_info(env);
338         struct lquota_glb_rec   *rec;
339         int                      rc;
340         ENTRY;
341
342         LASSERT(lqe != NULL);
343         LASSERT(lqe_is_master(lqe));
344         LASSERT(lqe_is_locked(lqe));
345         LASSERT(lqe->lqe_uptodate);
346         LASSERT((flags & ~(LQUOTA_BUMP_VER | LQUOTA_SET_VER)) == 0);
347
348         LQUOTA_DEBUG(lqe, "write glb");
349
350         /* never delete the entry even when the id isn't enforced and
351          * no any guota granted, otherwise, this entry will not be
352          * synced to slave during the reintegration. */
353         rec = &qti->qti_glb_rec;
354
355         /* fill global index with updated quota settings */
356         rec->qbr_granted   = lqe->lqe_granted;
357         if (lqe->lqe_is_default) {
358                 rec->qbr_hardlimit = 0;
359                 rec->qbr_softlimit = 0;
360                 rec->qbr_time      = LQUOTA_GRACE_FLAG(lqe->lqe_gracetime,
361                                                        LQUOTA_FLAG_DEFAULT);
362         } else if (lqe->lqe_is_reset) {
363                 rec->qbr_hardlimit = 0;
364                 rec->qbr_softlimit = 0;
365                 rec->qbr_granted = 0;
366                 rec->qbr_time      = LQUOTA_GRACE_FLAG(lqe->lqe_gracetime,
367                                                        LQUOTA_FLAG_RESET);
368         } else {
369                 rec->qbr_hardlimit = lqe->lqe_hardlimit;
370                 rec->qbr_softlimit = lqe->lqe_softlimit;
371                 rec->qbr_time      = lqe->lqe_gracetime;
372         }
373
374         /* write new quota settings */
375         rc = lquota_disk_write(env, th, LQE_GLB_OBJ(lqe), &lqe->lqe_id,
376                                (struct dt_rec *)rec, flags, ver);
377         if (rc)
378                 /* we failed to write the new quota settings to disk, report
379                  * error to caller who will restore the initial value */
380                 LQUOTA_ERROR(lqe, "failed to update global index, rc:%d", rc);
381
382         RETURN(rc);
383 }
384
385 /*
386  * Read from disk how much quota space is allocated to a slave.
387  * This is done by reading records from the dedicated slave index file.
388  * Return in \granted how much quota space is currently allocated to the
389  * slave.
390  * The entry must be at least read locked.
391  *
392  * \param env - the environment passed by the caller
393  * \param lqe_id - is the quota id associated with the identifier to look-up
394  *              in the slave index
395  * \param slv_obj - is the dt_object associated with the slave index
396  * \param granted - is the output parameter where to return how much space
397  *                  is granted to the slave.
398  *
399  * \retval    - 0 on success, appropriate error on failure
400  */
401 int qmt_slv_read(const struct lu_env *env, union lquota_id *qid,
402                  struct dt_object *slv_obj, __u64 *granted)
403 {
404         struct qmt_thread_info  *qti = qmt_info(env);
405         struct lquota_slv_rec   *slv_rec = &qti->qti_slv_rec;
406         int                      rc;
407         ENTRY;
408
409         CDEBUG(D_QUOTA, "read id:%llu form slv "DFID"\n",
410                qid->qid_uid, PFID(lu_object_fid(&slv_obj->do_lu)));
411
412         /* read slave record from disk */
413         rc = lquota_disk_read(env, slv_obj, qid,
414                               (struct dt_rec *)slv_rec);
415         switch (rc) {
416         case -ENOENT:
417                 *granted = 0;
418                 break;
419         case 0:
420                 /* extract granted from on-disk record */
421                 *granted = slv_rec->qsr_granted;
422                 break;
423         default:
424                 CERROR("Failed to read slave record for %llu from "DFID"\n",
425                        qid->qid_uid, PFID(lu_object_fid(&slv_obj->do_lu)));
426                 RETURN(rc);
427         }
428
429         CDEBUG(D_QUOTA, "Successful slv read %llu\n", *granted);
430
431         RETURN(0);
432 }
433
434 /*
435  * Update record in slave index file.
436  * The entry must be at least read locked.
437  *
438  * \param env - the environment passed by the caller
439  * \param th  - is the transaction handle to be used for the disk writes
440  * \param lqe - is the dirty quota entry which will be updated at the same time
441  *              as the slave index
442  * \param slv_obj - is the dt_object associated with the slave index
443  * \param flags - can be LQUOTA_BUMP_VER or LQUOTA_SET_VER.
444  * \param ver   - is used to return the new version of the index.
445  * \param granted - is the new amount of quota space owned by the slave
446  *
447  * \retval    - 0 on success, appropriate error on failure
448  */
449 int qmt_slv_write(const struct lu_env *env, struct thandle *th,
450                   struct lquota_entry *lqe, struct dt_object *slv_obj,
451                   __u32 flags, __u64 *ver, __u64 granted)
452 {
453         struct qmt_thread_info  *qti = qmt_info(env);
454         struct lquota_slv_rec   *rec;
455         int                      rc;
456         ENTRY;
457
458         LASSERT(lqe != NULL);
459         LASSERT(lqe_is_master(lqe));
460         LASSERT(lqe_is_locked(lqe));
461
462         LQUOTA_DEBUG(lqe, "write slv "DFID" granted:%llu",
463                      PFID(lu_object_fid(&slv_obj->do_lu)), granted);
464
465         /* never delete the entry, otherwise, it'll not be transferred
466          * to slave during reintegration. */
467         rec = &qti->qti_slv_rec;
468
469         /* updated space granted to this slave */
470         rec->qsr_granted = granted;
471
472         /* write new granted space */
473         rc = lquota_disk_write(env, th, slv_obj, &lqe->lqe_id,
474                                (struct dt_rec *)rec, flags, ver);
475         if (rc) {
476                 LQUOTA_ERROR(lqe,
477                              "failed to update slave index "DFID" granted:%llu",
478                              PFID(lu_object_fid(&slv_obj->do_lu)),
479                              granted);
480                 RETURN(rc);
481         }
482
483         RETURN(0);
484 }
485
486 /*
487  * Check whether new limits are valid for this pool
488  *
489  * \param lqe  - is the quota entry subject to the setquota
490  * \param hard - is the new hard limit
491  * \param soft - is the new soft limit
492  */
493 int qmt_validate_limits(struct lquota_entry *lqe, __u64 hard, __u64 soft)
494 {
495         ENTRY;
496
497         if (hard != 0 && soft > hard)
498                 /* soft limit must be less than hard limit */
499                 RETURN(-EINVAL);
500         RETURN(0);
501 }
502
503 /*
504  * Set/clear edquot flag after quota space allocation/release or settings
505  * change. Slaves will be notified of changes via glimpse on per-ID lock
506  *
507  * \param lqe - is the quota entry to check
508  * \param now - is the current time in second used for grace time managment
509  */
510 bool qmt_adjust_edquot(struct lquota_entry *lqe, __u64 now)
511 {
512         struct qmt_pool_info    *pool = lqe2qpi(lqe);
513         ENTRY;
514
515         if (!lqe->lqe_enforced || lqe->lqe_id.qid_uid == 0)
516                 RETURN(false);
517
518         if (!lqe->lqe_edquot) {
519                 /* space exhausted flag not set, let's check whether it is time
520                  * to set the flag */
521
522                 if (!qmt_space_exhausted(lqe, now))
523                         /* the qmt still has available space */
524                         RETURN(false);
525
526                 /* See comment in qmt_adjust_qunit(). LU-4139 */
527                 if (qmt_hard_exhausted(lqe) ||
528                     pool->qpi_rtype != LQUOTA_RES_DT) {
529                         time64_t lapse;
530
531                         /* we haven't reached the minimal qunit yet so there is
532                          * still hope that the rebalancing process might free
533                          * up some quota space */
534                         if (lqe->lqe_qunit != pool->qpi_least_qunit)
535                                 RETURN(false);
536
537                         /* least qunit value not sent to all slaves yet */
538                         if (lqe->lqe_revoke_time == 0)
539                                 RETURN(false);
540
541                         /* Let's give more time to slave to release space */
542                         lapse = ktime_get_seconds() - QMT_REBA_TIMEOUT;
543                         if (lqe->lqe_may_rel != 0 && lqe->lqe_revoke_time > lapse)
544                                 RETURN(false);
545                 } else {
546                         if (lqe->lqe_qunit > pool->qpi_soft_least_qunit)
547                                 RETURN(false);
548                 }
549
550                 /* set edquot flag */
551                 lqe->lqe_edquot = true;
552         } else {
553                 /* space exhausted flag set, let's check whether it is time to
554                  * clear it */
555
556                 if (qmt_space_exhausted(lqe, now))
557                         /* the qmt still has not space */
558                         RETURN(false);
559
560                 if (lqe->lqe_hardlimit != 0 &&
561                     lqe->lqe_granted + pool->qpi_least_qunit >
562                                                         lqe->lqe_hardlimit)
563                         /* we clear the flag only once at least one least qunit
564                          * is available */
565                         RETURN(false);
566
567                 /* clear edquot flag */
568                 lqe->lqe_edquot = false;
569         }
570
571         LQUOTA_DEBUG(lqe, "changing edquot flag");
572
573         /* let's notify slave by issuing glimpse on per-ID lock.
574          * the rebalance thread will take care of this */
575         RETURN(true);
576 }
577
578 /* Using least_qunit when over block softlimit will seriously impact the
579  * write performance, we need to do some special tweaking on that. */
580 static __u64 qmt_calc_softlimit(struct lquota_entry *lqe, bool *oversoft)
581 {
582         struct qmt_pool_info *pool = lqe2qpi(lqe);
583
584         LASSERT(lqe->lqe_softlimit != 0);
585         *oversoft = false;
586         /* No need to do special tweaking for inode limit */
587         if (pool->qpi_rtype != LQUOTA_RES_DT)
588                 return lqe->lqe_softlimit;
589
590         if (lqe->lqe_granted <= lqe->lqe_softlimit +
591                                 pool->qpi_soft_least_qunit) {
592                 return lqe->lqe_softlimit;
593         } else if (lqe->lqe_hardlimit != 0) {
594                 *oversoft = true;
595                 return lqe->lqe_hardlimit;
596         } else {
597                 *oversoft = true;
598                 return 0;
599         }
600 }
601
602 /*
603  * Try to grant more quota space back to slave.
604  *
605  * \param lqe     - is the quota entry for which we would like to allocate more
606  *                  space
607  * \param granted - is how much was already granted as part of the request
608  *                  processing
609  * \param spare   - is how much unused quota space the slave already owns
610  *
611  * \retval return how additional space can be granted to the slave
612  */
613 __u64 qmt_alloc_expand(struct lquota_entry *lqe, __u64 granted, __u64 spare)
614 {
615         struct qmt_pool_info    *pool = lqe2qpi(lqe);
616         __u64                    remaining, qunit;
617         int                      slv_cnt;
618
619         LASSERT(lqe->lqe_enforced && lqe->lqe_qunit != 0);
620
621         slv_cnt = qpi_slv_nr(lqe2qpi(lqe), lqe_qtype(lqe));
622         qunit = lqe->lqe_qunit;
623
624         /* See comment in qmt_adjust_qunit(). LU-4139. */
625         if (lqe->lqe_softlimit != 0) {
626                 bool oversoft;
627                 remaining = qmt_calc_softlimit(lqe, &oversoft);
628                 if (remaining == 0)
629                         remaining = lqe->lqe_granted +
630                                     pool->qpi_soft_least_qunit;
631         } else {
632                 remaining = lqe->lqe_hardlimit;
633         }
634
635         if (lqe->lqe_granted >= remaining)
636                 RETURN(0);
637
638         remaining -= lqe->lqe_granted;
639
640         do {
641                 if (spare >= qunit)
642                         break;
643
644                 granted &= (qunit - 1);
645
646                 if (remaining > (slv_cnt * qunit) >> 1) {
647                         /* enough room to grant more space w/o additional
648                          * shrinking ... at least for now */
649                         remaining -= (slv_cnt * qunit) >> 1;
650                 } else if (qunit != pool->qpi_least_qunit) {
651                         qunit >>= 2;
652                         continue;
653                 }
654
655                 granted &= (qunit - 1);
656                 if (spare > 0)
657                         RETURN(min_t(__u64, qunit - spare, remaining));
658                 else
659                         RETURN(min_t(__u64, qunit - granted, remaining));
660         } while (qunit >= pool->qpi_least_qunit);
661
662         RETURN(0);
663 }
664
665 static inline void
666 qmt_adjust_qunit_set_revoke(const struct lu_env *env, struct lquota_entry *lqe,
667                             unsigned long least_qunit)
668 {
669         struct lquota_entry *lqe2;
670         time64_t min = 0;
671         int i;
672
673         if (qti_lqes_cnt(env) <= 1)
674                 return;
675
676         for (i = 0; i < qti_lqes_cnt(env); i++) {
677                 lqe2 = qti_lqes(env)[i];
678                 if ((lqe2->lqe_qunit == least_qunit) && lqe2->lqe_revoke_time) {
679                         if (!min) {
680                                 min = lqe2->lqe_revoke_time;
681                                 continue;
682                         }
683                         min = lqe2->lqe_revoke_time < min ?
684                                 lqe2->lqe_revoke_time : min;
685                 }
686         }
687
688         lqe->lqe_revoke_time = min;
689 }
690
691
692 /*
693  * Adjust qunit size according to quota limits and total granted count.
694  * The caller must have locked the lqe.
695  *
696  * \param env - the environment passed by the caller
697  * \param lqe - is the qid entry to be adjusted
698  * \retval true - need reseed glbe array
699  */
700 bool qmt_adjust_qunit(const struct lu_env *env, struct lquota_entry *lqe)
701 {
702         struct qmt_pool_info    *pool = lqe2qpi(lqe);
703         bool                     need_reseed = false;
704         int                      slv_cnt;
705         __u64                    qunit, limit, qunit2 = 0;
706         ENTRY;
707
708         LASSERT(lqe_is_locked(lqe));
709
710         if (!lqe->lqe_enforced || lqe->lqe_id.qid_uid == 0)
711                 /* no quota limits */
712                 RETURN(need_reseed);
713
714         /* record how many slaves have already registered */
715         slv_cnt = qpi_slv_nr(pool, lqe_qtype(lqe));
716         if (slv_cnt == 0) {
717                 /* Pool hasn't slaves anymore. Qunit will be adjusted
718                  * again when new slaves would be added. */
719                 if (lqe->lqe_qunit) {
720                         qunit = 0;
721                         GOTO(done, qunit);
722                 }
723                 /* wait for at least one slave to join */
724                 RETURN(need_reseed);
725         }
726
727         /* Qunit calculation is based on soft limit, if any, hard limit
728          * otherwise. This means that qunit is shrunk to the minimum when
729          * beyond the soft limit. This will impact performance, but that's the
730          * price of an accurate grace time management. */
731         if (lqe->lqe_softlimit != 0) {
732                 bool oversoft;
733                 /* As a compromise of write performance and the grace time
734                  * accuracy, the block qunit size will be shrunk to
735                  * qpi_soft_least_qunit when over softlimit. LU-4139. */
736                 limit = qmt_calc_softlimit(lqe, &oversoft);
737                 if (oversoft)
738                         qunit2 = pool->qpi_soft_least_qunit;
739                 if (limit == 0)
740                         GOTO(done, qunit = qunit2);
741         } else if (lqe->lqe_hardlimit != 0) {
742                 limit = lqe->lqe_hardlimit;
743         } else {
744                 LQUOTA_ERROR(lqe, "enforced bit set, but neither hard nor soft "
745                              "limit are set");
746                 RETURN(need_reseed);
747         }
748
749         qunit = lqe->lqe_qunit == 0 ? pool->qpi_least_qunit : lqe->lqe_qunit;
750
751         /* The qunit value is computed as follows: limit / (2 * slv_cnt).
752          * Then 75% of the quota space can be granted with current qunit value.
753          * The remaining 25% are then used with reduced qunit size (by a factor
754          * of 4) which is then divided in a similar manner.
755          *
756          * |---------------------limit---------------------|
757          * |-------limit / 2-------|-limit / 4-|-limit / 4-|
758          * |qunit|qunit|qunit|qunit|           |           |
759          * |----slv_cnt * qunit----|           |           |
760          * |-grow limit-|          |           |           |
761          * |--------------shrink limit---------|           |
762          * |---space granted in qunit chunks---|-remaining-|
763          *                                    /             \
764          *                                   /               \
765          *                                  /                 \
766          *                                 /                   \
767          *                                /                     \
768          *     qunit >>= 2;            |qunit*slv_cnt|qunit*slv_cnt|
769          *                             |---space in qunit---|remain|
770          *                                  ...                               */
771         if (qunit == pool->qpi_least_qunit ||
772             limit >= lqe->lqe_granted + ((slv_cnt * qunit) >> 1)) {
773                 /* current qunit value still fits, let's see if we can afford to
774                  * increase qunit now ...
775                  * To increase qunit again, we have to be under 25% */
776                 while (qunit && limit >= lqe->lqe_granted + 6 * qunit * slv_cnt)
777                         qunit <<= 2;
778
779                 if (!qunit) {
780                         qunit = limit;
781                         do_div(qunit, 2 * slv_cnt);
782                 }
783
784         } else {
785                 /* shrink qunit until we find a suitable value */
786                 while (qunit > pool->qpi_least_qunit &&
787                        limit < lqe->lqe_granted + ((slv_cnt * qunit) >> 1))
788                         qunit >>= 2;
789         }
790
791         if (qunit2 && qunit > qunit2)
792                 qunit = qunit2;
793 done:
794         if (lqe->lqe_qunit == qunit)
795                 /* keep current qunit */
796                 RETURN(need_reseed);
797
798         LQUOTA_DEBUG(lqe, "%s qunit to %llu",
799                      lqe->lqe_qunit < qunit ? "increasing" : "decreasing",
800                      qunit);
801
802         /* store new qunit value */
803         swap(lqe->lqe_qunit, qunit);
804
805         /* reseed glbe array and notify
806          * slave if qunit was shrinked */
807         need_reseed = true;
808         /* reset revoke time */
809         lqe->lqe_revoke_time = 0;
810
811         if (lqe->lqe_qunit == pool->qpi_least_qunit) {
812                 if (lqe->lqe_qunit >= qunit)
813                         /* initial qunit value is the smallest one */
814                         lqe->lqe_revoke_time = ktime_get_seconds();
815                 /* If there are several lqes and lqe_revoke_time is set for
816                  * some of them, it means appropriate OSTs have been already
817                  * notified with the least qunit and there is no chance to
818                  * free more space. Find an lqe with the minimum(earliest)
819                  * revoke_time and set this time to the current one.
820                  */
821                 qmt_adjust_qunit_set_revoke(env, lqe, pool->qpi_least_qunit);
822         }
823         RETURN(need_reseed);
824 }
825
826 bool qmt_adjust_edquot_qunit_notify(const struct lu_env *env,
827                                     struct qmt_device *qmt,
828                                     __u64 now, bool edquot,
829                                     bool qunit, __u32 qb_flags,
830                                     int idx)
831 {
832         struct lquota_entry *lqe_gl, *lqe;
833         bool need_reseed = false;
834         bool need_notify = false;
835         int i;
836
837         lqe_gl = qti_lqes_glbl(env);
838
839         for (i = 0; i < qti_lqes_cnt(env); i++) {
840                 lqe = qti_lqes(env)[i];
841                 if (qunit)
842                         need_reseed |= qmt_adjust_qunit(env, lqe);
843                 if (edquot)
844                         need_reseed |= qmt_adjust_edquot(lqe, now);
845         }
846
847         LASSERT(lqe_gl);
848         if (!lqe_gl->lqe_glbl_data &&
849             (req_has_rep(qb_flags) || req_is_rel(qb_flags))) {
850                 if (need_reseed)
851                         CDEBUG(D_QUOTA,
852                                "%s: can not notify - lge_glbl_data is not set\n",
853                                qmt->qmt_svname);
854                 return need_reseed;
855         }
856
857         if (need_reseed || idx >= 0) {
858                 mutex_lock(&lqe_gl->lqe_glbl_data_lock);
859                 if (lqe_gl->lqe_glbl_data) {
860                         struct lqe_glbl_data *lgd = lqe_gl->lqe_glbl_data;
861
862                         if (need_reseed) {
863                                 qmt_seed_glbe_all(env, lgd, qunit, edquot);
864                         } else if (idx >= 0) {
865                                 LASSERT(idx <= lgd->lqeg_num_used);
866                                 /* If there are no locks yet when
867                                  * lge_qunit/edquot_nu is set, slaves
868                                  * are still not notified with new
869                                  * qunit/edquot value. In a such case
870                                  * we need to notify them with new values to
871                                  * avoid endless EINPROGRESS if qunit is equal
872                                  * to the least qunit, but lqe_revoke_time is
873                                  * still not set.
874                                  */
875                                 need_notify = lgd->lqeg_arr[idx].lge_qunit_nu ||
876                                               lgd->lqeg_arr[idx].lge_edquot_nu;
877                         }
878                 }
879                 mutex_unlock(&lqe_gl->lqe_glbl_data_lock);
880         }
881
882         if (need_reseed || need_notify)
883                 qmt_id_lock_notify(qmt, lqe_gl);
884
885         return need_reseed;
886 }
887
888
889 /*
890  * Adjust qunit & edquot flag in case it wasn't initialized already (e.g.
891  * limit set while no slaves were connected yet)
892  */
893 bool qmt_revalidate(const struct lu_env *env, struct lquota_entry *lqe)
894 {
895         bool need_notify = false;
896
897         if (lqe->lqe_qunit == 0) {
898                 /* lqe was read from disk, but neither qunit, nor edquot flag
899                  * were initialized */
900                 need_notify = qmt_adjust_qunit(env, lqe);
901                 if (lqe->lqe_qunit != 0)
902                         need_notify |= qmt_adjust_edquot(lqe,
903                                                 ktime_get_real_seconds());
904         }
905
906         return need_notify;
907 }
908
909 void qmt_revalidate_lqes(const struct lu_env *env,
910                          struct qmt_device *qmt, __u32 qb_flags)
911 {
912         struct lquota_entry *lqe_gl = qti_lqes_glbl(env);
913         bool need_notify = false;
914         int i;
915
916         for (i = 0; i < qti_lqes_cnt(env); i++)
917                 need_notify |= qmt_revalidate(env, qti_lqes(env)[i]);
918
919         if (!need_notify)
920                 return;
921
922         /* There could be no ID lock to the moment of reconciliation.
923          * As a result lqe global data is not initialised yet. It is ok
924          * for release and report requests. */
925         if (!lqe_gl->lqe_glbl_data &&
926             (req_is_rel(qb_flags) || req_has_rep(qb_flags))) {
927                 return;
928         }
929
930         mutex_lock(&lqe_gl->lqe_glbl_data_lock);
931         if (lqe_gl->lqe_glbl_data)
932                 qmt_seed_glbe(env, lqe_gl->lqe_glbl_data);
933         mutex_unlock(&lqe_gl->lqe_glbl_data_lock);
934
935         qmt_id_lock_notify(qmt, lqe_gl);
936 }
937
938 void qti_lqes_init(const struct lu_env *env)
939 {
940         struct qmt_thread_info  *qti = qmt_info(env);
941
942         qti->qti_lqes_cnt = 0;
943         qti->qti_glbl_lqe_idx = 0;
944         qti->qti_lqes_num = QMT_MAX_POOL_NUM;
945 }
946
947 int qti_lqes_add(const struct lu_env *env, struct lquota_entry *lqe)
948 {
949         struct qmt_thread_info  *qti = qmt_info(env);
950
951         if (qti->qti_lqes_cnt >= qti->qti_lqes_num) {
952                 struct lquota_entry     **lqes;
953                 lqes = qti->qti_lqes;
954                 OBD_ALLOC(lqes, sizeof(lqe) * qti->qti_lqes_num * 2);
955                 if (!lqes)
956                         return -ENOMEM;
957                 memcpy(lqes, qti_lqes(env), qti->qti_lqes_cnt * sizeof(lqe));
958                 /* Don't need to free, if it is the very 1st allocation */
959                 if (qti->qti_lqes_num > QMT_MAX_POOL_NUM)
960                         OBD_FREE(qti->qti_lqes,
961                                  qti->qti_lqes_num * sizeof(lqe));
962                 qti->qti_lqes = lqes;
963                 qti->qti_lqes_num *= 2;
964         }
965
966         if (lqe->lqe_is_global)
967                 qti->qti_glbl_lqe_idx = qti->qti_lqes_cnt;
968         qti_lqes(env)[qti->qti_lqes_cnt++] = lqe;
969
970         /* The pool could be accessed directly from lqe, so take
971          * extra reference that is put in qti_lqes_fini */
972         qpi_getref(lqe2qpi(lqe));
973
974         CDEBUG(D_QUOTA, "LQE %p %lu is added, lqe_cnt %d lqes_num %d\n",
975                          lqe, (long unsigned)lqe->lqe_id.qid_uid,
976                          qti->qti_lqes_cnt, qti->qti_lqes_num);
977         LASSERT(qti->qti_lqes_num != 0);
978
979         return 0;
980 }
981
982 void qti_lqes_del(const struct lu_env *env, int index)
983 {
984         struct lquota_entry     **lqes;
985         int lqes_cnt = qti_lqes_cnt(env);
986         int lqep_size = sizeof(struct lquota_entry *);
987
988         if (index == 0) {
989                 /* We can't handle non global lqes correctly without
990                  * global lqe located at index 0. If we try to do so,
991                  * something goes wrong. */
992                 LQUOTA_ERROR(qti_lqes_glbl(env),
993                              "quota: cannot remove lqe at index 0 as it is global");
994                 LASSERT(qti_lqes_glbl(env)->lqe_is_global);
995                 return;
996         }
997         lqes = qti_lqes(env);
998         qpi_putref(env, lqe2qpi(lqes[index]));
999         lqe_putref(lqes[index]);
1000         memcpy((unsigned char *)lqes + index * lqep_size,
1001                (unsigned char *)lqes + (index + 1) * lqep_size,
1002                (lqes_cnt - index - 1) * lqep_size);
1003         qti_lqes_cnt(env)--;
1004 }
1005
1006 void qti_lqes_fini(const struct lu_env *env)
1007 {
1008         struct qmt_thread_info  *qti = qmt_info(env);
1009         struct lquota_entry     **lqes = qti->qti_lqes;
1010         int i;
1011
1012         lqes = qti_lqes(env);
1013         for (i = 0; i < qti->qti_lqes_cnt; i++) {
1014                 qpi_putref(env, lqe2qpi(lqes[i]));
1015                 lqe_putref(lqes[i]);
1016         }
1017
1018         if (qti->qti_lqes_num > QMT_MAX_POOL_NUM)
1019                 OBD_FREE(qti->qti_lqes,
1020                          qti->qti_lqes_num * sizeof(struct lquota_entry *));
1021
1022         qti->qti_lqes_num = 0;
1023         qti->qti_lqes_cnt = 0;
1024 }
1025
1026 __u64 qti_lqes_min_qunit(const struct lu_env *env)
1027 {
1028         __u64 min, qunit;
1029         int i;
1030
1031         for (i = 1, min = qti_lqe_qunit(env, 0); i < qti_lqes_cnt(env); i++) {
1032                 qunit = qti_lqe_qunit(env, i);
1033                 /* if qunit is 0, lqe is not enforced and we can ignore it */
1034                 if (qunit && qunit < min)
1035                         min = qunit;
1036         }
1037
1038         return min;
1039 }
1040
1041 int qti_lqes_edquot(const struct lu_env *env)
1042 {
1043         int i;
1044
1045         for (i = 0; i < qti_lqes_cnt(env); i++) {
1046                 if (qti_lqes(env)[i]->lqe_edquot)
1047                         return 1;
1048         }
1049
1050         return 0;
1051 }
1052
1053 int qti_lqes_restore_init(const struct lu_env *env)
1054 {
1055         int rc = 0;
1056
1057         if (qti_lqes_inited(env) && qti_lqes_cnt(env) > QMT_MAX_POOL_NUM) {
1058                 OBD_ALLOC(qmt_info(env)->qti_lqes_rstr,
1059                           qti_lqes_cnt(env) * sizeof(struct qmt_lqe_restore));
1060                 if (!qti_lqes_rstr(env))
1061                         rc = -ENOMEM;
1062         }
1063
1064         return rc;
1065 }
1066
1067 void qti_lqes_restore_fini(const struct lu_env *env)
1068 {
1069         if (qti_lqes_inited(env) && qti_lqes_cnt(env) > QMT_MAX_POOL_NUM)
1070                 OBD_FREE(qmt_info(env)->qti_lqes_rstr,
1071                          qti_lqes_cnt(env) * sizeof(struct qmt_lqe_restore));
1072 }
1073
1074 void qti_lqes_write_lock(const struct lu_env *env)
1075 {
1076         int i;
1077
1078         for (i = 0; i < qti_lqes_cnt(env); i++)
1079                 lqe_write_lock(qti_lqes(env)[i]);
1080 }
1081
1082 void qti_lqes_write_unlock(const struct lu_env *env)
1083 {
1084         int i;
1085
1086         for (i = 0; i < qti_lqes_cnt(env); i++)
1087                 lqe_write_unlock(qti_lqes(env)[i]);
1088 }
1089
1090 #define QMT_INIT_SLV_CNT        64
1091 struct lqe_glbl_data *qmt_alloc_lqe_gd(struct qmt_pool_info *pool, int qtype)
1092 {
1093         struct lqe_glbl_data    *lgd;
1094         struct lqe_glbl_entry   *lqeg_arr;
1095         int                      slv_cnt, glbe_num;
1096
1097         OBD_ALLOC(lgd, sizeof(struct lqe_glbl_data));
1098         if (!lgd)
1099                 RETURN(NULL);
1100
1101         slv_cnt = qpi_slv_nr_by_rtype(pool, qtype);
1102
1103         glbe_num = slv_cnt < QMT_INIT_SLV_CNT ? QMT_INIT_SLV_CNT : slv_cnt;
1104         OBD_ALLOC(lqeg_arr, sizeof(struct lqe_glbl_entry) * glbe_num);
1105         if (!lqeg_arr) {
1106                 OBD_FREE(lgd, sizeof(struct lqe_glbl_data));
1107                 RETURN(NULL);
1108         }
1109
1110         CDEBUG(D_QUOTA, "slv_cnt %d glbe_num %d\n", slv_cnt, glbe_num);
1111
1112         lgd->lqeg_num_used = slv_cnt;
1113         lgd->lqeg_num_alloc = glbe_num;
1114         lgd->lqeg_arr = lqeg_arr;
1115
1116         RETURN(lgd);
1117 }
1118
1119 void qmt_free_lqe_gd(struct lqe_glbl_data *lgd)
1120 {
1121         OBD_FREE(lgd->lqeg_arr,
1122                  sizeof(struct lqe_glbl_entry) * lgd->lqeg_num_alloc);
1123         OBD_FREE(lgd, sizeof(struct lqe_glbl_data));
1124 }
1125
1126 void qmt_seed_glbe_all(const struct lu_env *env, struct lqe_glbl_data *lgd,
1127                        bool qunit, bool edquot)
1128 {
1129         struct rw_semaphore     *sem = NULL;
1130         struct qmt_pool_info    *qpi;
1131         int                      i, j, idx;
1132         ENTRY;
1133
1134         if (!qti_lqes_cnt(env))
1135                 RETURN_EXIT;
1136         /* lqes array is sorted by qunit - the first entry has minimum qunit.
1137          * Thus start seeding global qunit's array beginning from the 1st lqe
1138          * and appropriate pool. If pools overlapped, slaves from this
1139          * overlapping get minimum qunit value.
1140          * user1: pool1, pool2, pool_glbl;
1141          * pool1: OST1; user1_qunit = 10M;
1142          * pool2: OST0, OST1, OST2; user1_qunit = 30M;
1143          * pool_glbl: OST0, OST1, OST2, OST3; user1_qunit = 160M;
1144          * qunit array after seeding should be:
1145          * OST0: 30M; OST1: 10M; OST2: 30M; OST3: 160M; */
1146
1147         /* edquot resetup algorythm works fine
1148          * with not sorted lqes */
1149         if (qunit)
1150                 qmt_lqes_sort(env);
1151
1152         for (i = 0; i < lgd->lqeg_num_used; i++) {
1153                 lgd->lqeg_arr[i].lge_qunit_set = 0;
1154                 lgd->lqeg_arr[i].lge_qunit_nu = 0;
1155                 lgd->lqeg_arr[i].lge_edquot_nu = 0;
1156         }
1157
1158         for (i = 0; i < qti_lqes_cnt(env); i++) {
1159                 struct lquota_entry *lqe = qti_lqes(env)[i];
1160                 int slaves_cnt;
1161
1162                 CDEBUG(D_QUOTA, "lqes_cnt %d, i %d\n", qti_lqes_cnt(env), i);
1163                 qpi = lqe2qpi(lqe);
1164                 if (qmt_pool_global(qpi)) {
1165                         slaves_cnt = qpi_slv_nr_by_rtype(lqe2qpi(lqe),
1166                                                          lqe_qtype(lqe));
1167                 } else {
1168                         sem = qmt_sarr_rwsem(qpi);
1169                         down_read(sem);
1170                         slaves_cnt = qmt_sarr_count(qpi);
1171                 }
1172
1173                 for (j = 0; j < slaves_cnt; j++) {
1174                         idx = qmt_sarr_get_idx(qpi, j);
1175                         LASSERT(idx >= 0);
1176
1177                         if (edquot) {
1178                                 int lge_edquot, new_edquot, edquot_nu;
1179
1180                                 lge_edquot = lgd->lqeg_arr[idx].lge_edquot;
1181                                 edquot_nu = lgd->lqeg_arr[idx].lge_edquot_nu;
1182                                 new_edquot = lqe->lqe_edquot;
1183
1184                                 if (lge_edquot == new_edquot ||
1185                                     (edquot_nu && lge_edquot == 1))
1186                                         goto qunit_lbl;
1187                                 lgd->lqeg_arr[idx].lge_edquot = new_edquot;
1188                                 /* it is needed for the following case:
1189                                  * initial values for idx i -
1190                                  * lqe_edquot = 1, lqe_edquot_nu == 0;
1191                                  * 1: new_edquot == 0 ->
1192                                  *      lqe_edquot = 0, lqe_edquot_nu = 1;
1193                                  * 2: new_edquot == 1 ->
1194                                  *      lqe_edquot = 1, lqe_edquot_nu = 0;
1195                                  * At the 2nd iteration lge_edquot comes back
1196                                  * to 1, so no changes and we don't need
1197                                  * to notify slave. */
1198                                 lgd->lqeg_arr[idx].lge_edquot_nu = !edquot_nu;
1199                         }
1200 qunit_lbl:
1201                         if (qunit) {
1202                                 __u64 lge_qunit, new_qunit;
1203
1204                                 CDEBUG(D_QUOTA,
1205                                        "idx %d lge_qunit_set %d lge_qunit %llu new_qunit %llu\n",
1206                                        idx, lgd->lqeg_arr[idx].lge_qunit_set,
1207                                        lgd->lqeg_arr[idx].lge_qunit,
1208                                        lqe->lqe_qunit);
1209                                 /* lge for this idx is already set
1210                                  * on previous iteration */
1211                                 if (lgd->lqeg_arr[idx].lge_qunit_set)
1212                                         continue;
1213                                 lge_qunit = lgd->lqeg_arr[idx].lge_qunit;
1214                                 new_qunit = lqe->lqe_qunit;
1215                                 /* qunit could be not set,
1216                                  * so use global lqe's qunit */
1217                                 if (!new_qunit)
1218                                         continue;
1219
1220                                 if (lge_qunit != new_qunit)
1221                                         lgd->lqeg_arr[idx].lge_qunit =
1222                                                                 new_qunit;
1223
1224                                 /* TODO: initially slaves notification was done
1225                                  * only for qunit shrinking. Should we always
1226                                  * notify slaves with new qunit ? */
1227                                 if (lge_qunit > new_qunit)
1228                                         lgd->lqeg_arr[idx].lge_qunit_nu = 1;
1229                                 lgd->lqeg_arr[idx].lge_qunit_set = 1;
1230                         }
1231                 }
1232
1233                 if (!qmt_pool_global(qpi))
1234                         up_read(sem);
1235         }
1236         /* TODO: only for debug purposes - remove it later */
1237         for (i = 0; i < lgd->lqeg_num_used; i++)
1238                 CDEBUG(D_QUOTA,
1239                         "lgd ost %d, qunit %lu nu %d;  edquot %d nu %d\n",
1240                         i, (long unsigned)lgd->lqeg_arr[i].lge_qunit,
1241                         lgd->lqeg_arr[i].lge_qunit_nu,
1242                         lgd->lqeg_arr[i].lge_edquot,
1243                         lgd->lqeg_arr[i].lge_edquot_nu);
1244
1245         EXIT;
1246 }
1247
1248 void qmt_setup_lqe_gd(const struct lu_env *env, struct qmt_device *qmt,
1249                       struct lquota_entry *lqe, struct lqe_glbl_data *lgd,
1250                       int pool_type)
1251 {
1252         __u64                    qunit;
1253         bool                     edquot;
1254         int                      i;
1255
1256         qunit = lqe->lqe_qunit;
1257         edquot = lqe->lqe_edquot;
1258
1259         /* Firstly set all elements in array with
1260          * qunit and edquot of global pool */
1261         for (i = 0; i < lgd->lqeg_num_used; i++) {
1262                 lgd->lqeg_arr[i].lge_qunit = qunit;
1263                 lgd->lqeg_arr[i].lge_edquot = edquot;
1264                 /* It is the very first lvb setup - qunit and other flags
1265                  * will be sent to slaves during qmt_lvbo_fill. */
1266                 lgd->lqeg_arr[i].lge_qunit_nu = 0;
1267                 lgd->lqeg_arr[i].lge_edquot_nu = 0;
1268         }
1269
1270         qmt_pool_lqes_lookup_spec(env, qmt, pool_type,
1271                                   lqe_qtype(lqe), &lqe->lqe_id);
1272         qmt_seed_glbe(env, lgd);
1273
1274         lqe->lqe_glbl_data = lgd;
1275         qmt_id_lock_notify(qmt, lqe);
1276
1277         qti_lqes_fini(env);
1278 }