Whamcloud - gitweb
66c4aa42a8b0115376eae0fde2ef14be0dff2254
[fs/lustre-release.git] / lustre / quota / qmt_pool.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 021110-1307, USA
20  *
21  * GPL HEADER END
22  */
23 /*
24  * Copyright (c) 2012, 2017, Intel Corporation.
25  * Use is subject to license terms.
26  *
27  * Author: Johann Lombardi <johann.lombardi@intel.com>
28  * Author: Niu    Yawei    <yawei.niu@intel.com>
29  */
30
31 /*
32  * A Quota Master Target has a list(qmt_pool_list) where it stores qmt_pool_info
33  * structures. There is one such structure for each pool managed by the QMT.
34  *
35  * Each pool can have different quota types enforced (typically user & group
36  * quota). A pool is in charge of managing lquota_entry structures for each
37  * quota type. This is done by creating one lquota_entry site per quota
38  * type. A site stores entries in a hash table and read quota settings from disk
39  * when a given ID isn't present in the hash.
40  *
41  * The pool API exported here is the following:
42  * - qmt_pool_init(): initializes the general QMT structures used to manage
43  *                    pools.
44  * - qmt_pool_fini(): frees the structures allocated by qmt_pool_fini().
45  * - qmt_pool_prepare(): sets up the on-disk indexes associated with each pool.
46  * - qmt_pool_new_conn(): is used to create a new slave index file.
47  * - qmt_pool_lqe_lookup(): returns an up-to-date lquota entry associated with
48  *                          a given ID.
49  */
50
51 #define DEBUG_SUBSYSTEM S_LQUOTA
52
53 #include <obd_class.h>
54 #include <lprocfs_status.h>
55 #include "qmt_internal.h"
56
57 static inline int qmt_sarr_pool_init(struct qmt_pool_info *qpi);
58 #define qmt_sarr_pool_add(qpi, idx, stype) \
59                 _qmt_sarr_pool_add(qpi, idx, stype, false)
60 #define qmt_sarr_pool_add_locked(qpi, idx, stype) \
61                 _qmt_sarr_pool_add(qpi, idx, stype, true)
62 static inline int _qmt_sarr_pool_add(struct qmt_pool_info *qpi,
63                                     int idx, int min, bool locked);
64 static inline int qmt_sarr_pool_rem(struct qmt_pool_info *qpi, int idx);
65 static inline void qmt_sarr_pool_free(struct qmt_pool_info *qpi);
66 static inline int qmt_sarr_check_idx(struct qmt_pool_info *qpi, int idx);
67 static inline void qmt_stop_pool_recalc(struct qmt_pool_info *qpi);
68
69 /*
70  * Static helper functions not used outside the scope of this file
71  */
72
73 static inline void qpi_putref_locked(struct qmt_pool_info *pool)
74 {
75         LASSERT(atomic_read(&pool->qpi_ref) > 1);
76         atomic_dec(&pool->qpi_ref);
77 }
78
79 /* some procfs helpers */
80 static int qpi_state_seq_show(struct seq_file *m, void *data)
81 {
82         struct qmt_pool_info    *pool = m->private;
83         int                      type;
84
85         LASSERT(pool != NULL);
86         if (unlikely(!test_bit(QPI_FLAG_STATE_INITED, &pool->qpi_flags)))
87                 return -ENOENT;
88
89         seq_printf(m, "pool:\n"
90                    "    id: %u\n"
91                    "    type: %s\n"
92                    "    refcount: %d\n"
93                    "    least_qunit: %lu\n",
94                    0,
95                    RES_NAME(pool->qpi_rtype),
96                    atomic_read(&pool->qpi_ref),
97                    pool->qpi_least_qunit);
98
99         for (type = 0; type < LL_MAXQUOTAS; type++)
100                 seq_printf(m, "    %s:\n"
101                            "        quota_servers: %d\n"
102                            "        quota_entries: %d\n",
103                            qtype_name(type),
104                            qpi_slv_nr(pool, type),
105                     atomic_read(&pool->qpi_site[type]->lqs_hash->hs_count));
106
107         return 0;
108 }
109 LPROC_SEQ_FOPS_RO(qpi_state);
110
111 static int qpi_soft_least_qunit_seq_show(struct seq_file *m, void *data)
112 {
113         struct qmt_pool_info    *pool = m->private;
114         LASSERT(pool != NULL);
115         if (unlikely(!test_bit(QPI_FLAG_STATE_INITED, &pool->qpi_flags)))
116                 return -ENOENT;
117
118         seq_printf(m, "%lu\n", pool->qpi_soft_least_qunit);
119         return 0;
120 }
121
122 static ssize_t
123 qpi_soft_least_qunit_seq_write(struct file *file, const char __user *buffer,
124                                size_t count, loff_t *off)
125 {
126         struct seq_file *m = file->private_data;
127         struct qmt_pool_info *pool = m->private;
128         long long least_qunit, qunit;
129         int rc;
130
131         LASSERT(pool != NULL);
132         if (unlikely(!test_bit(QPI_FLAG_STATE_INITED, &pool->qpi_flags)))
133                 return -ENOENT;
134
135         /* Not tuneable for inode limit */
136         if (pool->qpi_rtype != LQUOTA_RES_DT)
137                 return -EINVAL;
138
139         rc = kstrtoll_from_user(buffer, count, 0, &least_qunit);
140         if (rc)
141                 return rc;
142
143         /* Miminal qpi_soft_least_qunit */
144         qunit = pool->qpi_least_qunit << 2;
145         /* The value must be power of miminal qpi_soft_least_qunit, see
146          * how the qunit is adjusted in qmt_adjust_qunit(). */
147         while (qunit > 0 && qunit < least_qunit)
148                 qunit <<= 2;
149         if (qunit <= 0)
150                 qunit = INT_MAX & ~3;
151
152         pool->qpi_soft_least_qunit = qunit;
153         return count;
154 }
155 LPROC_SEQ_FOPS(qpi_soft_least_qunit);
156
157 static struct lprocfs_vars lprocfs_quota_qpi_vars[] = {
158         { .name =       "info",
159           .fops =       &qpi_state_fops },
160         { .name =       "soft_least_qunit",
161           .fops =       &qpi_soft_least_qunit_fops },
162         { NULL }
163 };
164
165 /*
166  * Allocate a new qmt_pool_info structure and add it to qmt_pool_list.
167  *
168  * \param env       - is the environment passed by the caller
169  * \param qmt       - is the quota master target
170  * \param pool_type - is the resource type of this pool instance, either
171  *                    LQUOTA_RES_MD or LQUOTA_RES_DT.
172  *
173  * \retval - 0 on success, appropriate error on failure
174  */
175 static int qmt_pool_alloc(const struct lu_env *env, struct qmt_device *qmt,
176                           char *pool_name, int pool_type)
177 {
178         struct qmt_thread_info  *qti = qmt_info(env);
179         struct qmt_pool_info    *pool;
180         int                      rc = 0;
181         ENTRY;
182
183         OBD_ALLOC_PTR(pool);
184         if (pool == NULL)
185                 RETURN(-ENOMEM);
186         INIT_LIST_HEAD(&pool->qpi_linkage);
187         init_rwsem(&pool->qpi_recalc_sem);
188
189         pool->qpi_rtype = pool_type;
190         pool->qpi_flags = 0;
191
192         /* initialize refcount to 1, hash table will then grab an additional
193          * reference */
194         atomic_set(&pool->qpi_ref, 1);
195
196         /* set up least qunit size to use for this pool */
197         pool->qpi_least_qunit = LQUOTA_LEAST_QUNIT(pool_type);
198         if (pool_type == LQUOTA_RES_DT)
199                 pool->qpi_soft_least_qunit = pool->qpi_least_qunit << 2;
200         else
201                 pool->qpi_soft_least_qunit = pool->qpi_least_qunit;
202
203         /* grab reference on master target that this pool belongs to */
204         lu_device_get(qmt2lu_dev(qmt));
205         lu_ref_add(&qmt2lu_dev(qmt)->ld_reference, "pool", pool);
206         pool->qpi_qmt = qmt;
207
208         /* create pool proc directory */
209         snprintf(qti->qti_buf, LQUOTA_NAME_MAX, "%s-%s",
210                  RES_NAME(pool_type), pool_name);
211         strncpy(pool->qpi_name, pool_name, QPI_MAXNAME);
212         pool->qpi_proc = lprocfs_register(qti->qti_buf, qmt->qmt_proc,
213                                           lprocfs_quota_qpi_vars, pool);
214         if (IS_ERR(pool->qpi_proc)) {
215                 rc = PTR_ERR(pool->qpi_proc);
216                 CERROR("%s: failed to create proc entry for pool %s (%d)\n",
217                        qmt->qmt_svname, qti->qti_buf, rc);
218                 pool->qpi_proc = NULL;
219                 GOTO(out, rc);
220         }
221
222         rc = qmt_sarr_pool_init(pool);
223         if (rc)
224                 GOTO(out, rc);
225
226         /* add to qmt pool list */
227         down_write(&qmt->qmt_pool_lock);
228         list_add_tail(&pool->qpi_linkage, &qmt->qmt_pool_list);
229         up_write(&qmt->qmt_pool_lock);
230         EXIT;
231 out:
232         if (rc)
233                 /* this frees the pool structure since refcount is equal to 1 */
234                 qpi_putref(env, pool);
235         return rc;
236 }
237
238 /*
239  * Delete a qmt_pool_info instance and all structures associated.
240  *
241  * \param env  - is the environment passed by the caller
242  * \param pool - is the qmt_pool_info structure to free
243  */
244 void qmt_pool_free(const struct lu_env *env, struct qmt_pool_info *pool)
245 {
246         struct  qmt_device *qmt = pool->qpi_qmt;
247         int     qtype;
248         ENTRY;
249
250         /* remove from list */
251         down_write(&qmt->qmt_pool_lock);
252         list_del_init(&pool->qpi_linkage);
253         up_write(&qmt->qmt_pool_lock);
254
255         if (atomic_read(&pool->qpi_ref) > 0)
256                 RETURN_EXIT;
257
258         qmt_stop_pool_recalc(pool);
259         qmt_sarr_pool_free(pool);
260
261         /* release proc entry */
262         if (pool->qpi_proc) {
263                 lprocfs_remove(&pool->qpi_proc);
264                 pool->qpi_proc = NULL;
265         }
266
267         /* release per-quota type site used to manage quota entries as well as
268          * references to global index files */
269         for (qtype = 0; qtype < LL_MAXQUOTAS; qtype++) {
270                 /* release lqe storing grace time */
271                 if (pool->qpi_grace_lqe[qtype] != NULL)
272                         lqe_putref(pool->qpi_grace_lqe[qtype]);
273
274                 /* release site */
275                 if (pool->qpi_site[qtype] != NULL &&
276                     !IS_ERR(pool->qpi_site[qtype]))
277                         lquota_site_free(env, pool->qpi_site[qtype]);
278                 /* release reference to global index */
279                 if (pool->qpi_glb_obj[qtype] != NULL &&
280                     !IS_ERR(pool->qpi_glb_obj[qtype]))
281                         dt_object_put(env, pool->qpi_glb_obj[qtype]);
282         }
283
284         /* release reference on pool directory */
285         if (pool->qpi_root != NULL && !IS_ERR(pool->qpi_root))
286                 dt_object_put(env, pool->qpi_root);
287
288         /* release reference on the master target */
289         if (pool->qpi_qmt != NULL) {
290                 struct lu_device *ld = qmt2lu_dev(pool->qpi_qmt);
291
292                 lu_ref_del(&ld->ld_reference, "pool", pool);
293                 lu_device_put(ld);
294                 pool->qpi_qmt = NULL;
295         }
296
297         LASSERT(list_empty(&pool->qpi_linkage));
298         OBD_FREE_PTR(pool);
299 }
300
301 static inline void qti_pools_init(const struct lu_env *env)
302 {
303         struct qmt_thread_info  *qti = qmt_info(env);
304
305         qti->qti_pools_cnt = 0;
306         qti->qti_pools_num = QMT_MAX_POOL_NUM;
307 }
308
309 #define qti_pools(qti)  (qti->qti_pools_num > QMT_MAX_POOL_NUM ? \
310                                 qti->qti_pools : qti->qti_pools_small)
311 #define qti_pools_env(env) \
312         (qmt_info(env)->qti_pools_num > QMT_MAX_POOL_NUM ? \
313                 qmt_info(env)->qti_pools : qmt_info(env)->qti_pools_small)
314 #define qti_pools_cnt(env)      (qmt_info(env)->qti_pools_cnt)
315
316 static inline int qti_pools_add(const struct lu_env *env,
317                                 struct qmt_pool_info *qpi)
318 {
319         struct qmt_thread_info  *qti = qmt_info(env);
320         struct qmt_pool_info    **pools = qti->qti_pools;
321
322         pools = qti_pools(qti);
323         LASSERTF(qti->qti_pools_num >= QMT_MAX_POOL_NUM,
324                  "Forgot init? %px\n", qti);
325
326         if (qti->qti_pools_cnt >= qti->qti_pools_num) {
327                 OBD_ALLOC(pools, sizeof(qpi) * qti->qti_pools_num * 2);
328                 if (!pools)
329                         return -ENOMEM;
330                 memcpy(pools, qti_pools(qti), qti->qti_pools_cnt * sizeof(qpi));
331                 /* Don't need to free, if it is the very 1st allocation */
332                 if (qti->qti_pools_num > QMT_MAX_POOL_NUM)
333                         OBD_FREE(qti->qti_pools,
334                                  qti->qti_pools_num * sizeof(qpi));
335                 qti->qti_pools = pools;
336                 qti->qti_pools_num *= 2;
337         }
338
339         qpi_getref(qpi);
340         /* Take this to protect pool's lqes against changing by
341          * recalculation thread. This would be unlocked at
342          * qti_pools_fini. */
343         down_read(&qpi->qpi_recalc_sem);
344         if (qmt_pool_global(qpi) && qti_pools_cnt(env) > 0) {
345                 pools[qti->qti_pools_cnt++] = pools[0];
346                 /* Store global pool always at index 0 */
347                 pools[0] = qpi;
348         } else {
349                 pools[qti->qti_pools_cnt++] = qpi;
350         }
351
352         CDEBUG(D_QUOTA, "Pool %s is added, pools %p qti_pools %p pool_num %d\n",
353                qpi->qpi_name, pools, qti->qti_pools, qti->qti_pools_cnt);
354
355         return 0;
356 }
357
358 static inline void qti_pools_fini(const struct lu_env *env)
359 {
360         struct qmt_thread_info  *qti = qmt_info(env);
361         struct qmt_pool_info    **pools = qti->qti_pools;
362         int i;
363
364         LASSERT(qti->qti_pools_cnt > 0);
365
366         pools = qti_pools(qti);
367         for (i = 0; i < qti->qti_pools_cnt; i++) {
368                 up_read(&pools[i]->qpi_recalc_sem);
369                 qpi_putref(env, pools[i]);
370         }
371
372         if (qti->qti_pools_num > QMT_MAX_POOL_NUM)
373                 OBD_FREE(qti->qti_pools,
374                          qti->qti_pools_num * sizeof(struct qmt_pool_info *));
375 }
376
377 /*
378  * Look-up a pool in a list based on the type.
379  *
380  * \param env   - is the environment passed by the caller
381  * \param qmt   - is the quota master target
382  * \param rtype - is the type of this pool, either LQUOTA_RES_MD or
383  *                    LQUOTA_RES_DT.
384  * \param pool_name - is the pool name to search for
385  * \param idx   - OST or MDT index to search for. When it is >= 0, function
386  *              returns array with pointers to all pools that include
387  *              targets with requested index.
388  * \param add   - add to qti_pool_arr if true
389  */
390 struct qmt_pool_info *qmt_pool_lookup(const struct lu_env *env,
391                                              struct qmt_device *qmt,
392                                              int rtype,
393                                              char *pool_name,
394                                              int idx, bool add)
395 {
396         struct qmt_pool_info    *pos, *pool;
397         int rc = 0;
398         ENTRY;
399
400         down_read(&qmt->qmt_pool_lock);
401         if (list_empty(&qmt->qmt_pool_list)) {
402                 up_read(&qmt->qmt_pool_lock);
403                 RETURN(ERR_PTR(-ENOENT));
404         }
405
406         CDEBUG(D_QUOTA, "type %d name %s index %d\n",
407                rtype, pool_name ?: "<none>", idx);
408         /* Now just find a pool with correct type in a list. Further we need
409          * to go through the list and find a pool that includes requested OST
410          * or MDT. Possibly this would return a list of pools that includes
411          * needed target(OST/MDT). */
412         pool = NULL;
413         if (idx == -1 && !pool_name)
414                 pool_name = GLB_POOL_NAME;
415
416         list_for_each_entry(pos, &qmt->qmt_pool_list, qpi_linkage) {
417                 if (pos->qpi_rtype != rtype)
418                         continue;
419
420                 if (idx >= 0 && !qmt_sarr_check_idx(pos, idx)) {
421                         rc = qti_pools_add(env, pos);
422                         if (rc)
423                                 break;
424                         continue;
425                 }
426
427                 if (pool_name && !strncmp(pool_name, pos->qpi_name,
428                                           LOV_MAXPOOLNAME)) {
429                         pool = pos;
430                         if (add) {
431                                 rc = qti_pools_add(env, pos);
432                                 if (rc)
433                                         break;
434                         } else {
435                                 qpi_getref(pool);
436                         }
437                         break;
438                 }
439         }
440         up_read(&qmt->qmt_pool_lock);
441
442         if (rc)
443                 GOTO(out_err, rc);
444
445         if (idx >= 0 && qti_pools_cnt(env))
446                 pool = qti_pools_env(env)[0];
447
448         RETURN(pool ? : ERR_PTR(-ENOENT));
449 out_err:
450         CERROR("%s: cannot add pool %s: err = %d\n",
451                 qmt->qmt_svname, pos->qpi_name, rc);
452         return ERR_PTR(rc);
453 }
454
455 /*
456  * Functions implementing the pool API, used by the qmt handlers
457  */
458
459 /*
460  * Destroy all pools which are still in the pool list.
461  *
462  * \param env - is the environment passed by the caller
463  * \param qmt - is the quota master target
464  *
465  */
466 void qmt_pool_fini(const struct lu_env *env, struct qmt_device *qmt)
467 {
468         struct qmt_pool_info *pool, *tmp;
469         ENTRY;
470
471         /* parse list of pool and destroy each element */
472         list_for_each_entry_safe(pool, tmp, &qmt->qmt_pool_list, qpi_linkage) {
473                 /* stop all recalc threads - it may hold qpi reference */
474                 qmt_stop_pool_recalc(pool);
475                 /* release extra reference taken in qmt_pool_alloc */
476                 qpi_putref(env, pool);
477         }
478         LASSERT(list_empty(&qmt->qmt_pool_list));
479
480         EXIT;
481 }
482
483 /*
484  * Initialize pool configure for the quota master target. For now, we only
485  * support the default data (i.e. all OSTs) and metadata (i.e. all the MDTs)
486  * pool which are instantiated in this function.
487  *
488  * \param env - is the environment passed by the caller
489  * \param qmt - is the quota master target for which we have to initialize the
490  *              pool configuration
491  *
492  * \retval - 0 on success, appropriate error on failure
493  */
494 int qmt_pool_init(const struct lu_env *env, struct qmt_device *qmt)
495 {
496         int     res, rc = 0;
497         ENTRY;
498
499         INIT_LIST_HEAD(&qmt->qmt_pool_list);
500         init_rwsem(&qmt->qmt_pool_lock);
501
502         /* Instantiate pool master for the default data and metadata pool.
503          * This code will have to be revisited once we support quota on
504          * non-default pools */
505         for (res = LQUOTA_FIRST_RES; res < LQUOTA_LAST_RES; res++) {
506                 rc = qmt_pool_alloc(env, qmt, GLB_POOL_NAME, res);
507                 if (rc)
508                         break;
509         }
510
511         if (rc)
512                 qmt_pool_fini(env, qmt);
513
514         RETURN(rc);
515 }
516
517 static int qmt_slv_add(const struct lu_env *env, struct lu_fid *glb_fid,
518                        char *slv_name, struct lu_fid *slv_fid, void *arg)
519 {
520         struct obd_uuid uuid;
521         struct qmt_pool_info *qpi = arg;
522         int stype, qtype, idx;
523         int rc;
524
525         rc = lquota_extract_fid(glb_fid, NULL, &qtype);
526         LASSERT(!rc);
527
528         obd_str2uuid(&uuid, slv_name);
529         stype = qmt_uuid2idx(&uuid, &idx);
530         if (stype < 0)
531                 return stype;
532
533         CDEBUG(D_QUOTA, "add new idx:%d in %s\n", idx, qpi->qpi_name);
534         rc = qmt_sarr_pool_add(qpi, idx, stype);
535         if (rc && rc != -EEXIST) {
536                 CERROR("%s: can't add idx %d into dt-0x0: rc = %d\n",
537                        qpi->qpi_qmt->qmt_svname, idx, rc);
538                 return rc;
539         }
540
541         /* one more slave */
542         qpi->qpi_slv_nr[stype][qtype]++;
543         CDEBUG(D_QUOTA, "slv_name %s stype %d qtype %d nr %d\n",
544                         slv_name, stype, qtype, qpi->qpi_slv_nr[stype][qtype]);
545
546         return 0;
547 }
548
549 /*
550  * Set up on-disk index files associated with each pool.
551  *
552  * \param env - is the environment passed by the caller
553  * \param qmt - is the quota master target for which we have to initialize the
554  *              pool configuration
555  * \param qmt_root - is the on-disk directory created for the QMT.
556  * \param name - is the pool name that we need to setup. Setup all pools
557  *               in qmt_pool_list when name is NULL.
558  *
559  * \retval - 0 on success, appropriate error on failure
560  */
561 int qmt_pool_prepare(const struct lu_env *env, struct qmt_device *qmt,
562                      struct dt_object *qmt_root, char *name)
563 {
564         struct qmt_thread_info  *qti = qmt_info(env);
565         struct lquota_glb_rec   *rec = &qti->qti_glb_rec;
566         struct qmt_pool_info    *pool;
567         struct dt_device        *dev = NULL;
568         dt_obj_version_t         version;
569         struct list_head        *pos;
570         int                      rc = 0, i, qtype;
571         ENTRY;
572
573         /* iterate over each pool in the list and allocate a quota site for each
574          * one. This involves creating a global index file on disk */
575         list_for_each(pos, &qmt->qmt_pool_list) {
576                 struct dt_object        *obj;
577                 struct lquota_entry     *lqe;
578                 char                    *pool_name;
579                 int                      rtype;
580
581                 pool = list_entry(pos, struct qmt_pool_info,
582                                   qpi_linkage);
583
584                 pool_name = pool->qpi_name;
585                 if (name && strncmp(pool_name, name, LOV_MAXPOOLNAME))
586                         continue;
587                 rtype = pool->qpi_rtype;
588                 if (dev == NULL)
589                         dev = pool->qpi_qmt->qmt_child;
590
591                 /* allocate directory for this pool */
592                 snprintf(qti->qti_buf, LQUOTA_NAME_MAX, "%s-%s",
593                          RES_NAME(rtype), pool_name);
594                 obj = lquota_disk_dir_find_create(env, qmt->qmt_child, qmt_root,
595                                                   qti->qti_buf);
596                 if (IS_ERR(obj))
597                         RETURN(PTR_ERR(obj));
598                 pool->qpi_root = obj;
599
600                 for (qtype = 0; qtype < LL_MAXQUOTAS; qtype++) {
601                         /* Generating FID of global index in charge of storing
602                          * settings for this quota type */
603                         lquota_generate_fid(&qti->qti_fid, rtype, qtype);
604
605                         /* open/create the global index file for this quota
606                          * type. If name is set, it means we came here from
607                          * qmt_pool_new and can create glb index with a
608                          * local generated FID. */
609                         obj = lquota_disk_glb_find_create(env, dev,
610                                                           pool->qpi_root,
611                                                           &qti->qti_fid,
612                                                           name ? true : false);
613                         if (IS_ERR(obj)) {
614                                 rc = PTR_ERR(obj);
615                                 CERROR("%s: failed to create glb index copy for %s type: rc = %d\n",
616                                        qmt->qmt_svname, qtype_name(qtype), rc);
617                                 RETURN(rc);
618                         }
619
620                         pool->qpi_glb_obj[qtype] = obj;
621
622                         version = dt_version_get(env, obj);
623                         /* set default grace time for newly created index */
624                         if (version == 0) {
625                                 rec->qbr_hardlimit = 0;
626                                 rec->qbr_softlimit = 0;
627                                 rec->qbr_granted = 0;
628                                 rec->qbr_time = rtype == LQUOTA_RES_MD ?
629                                         MAX_IQ_TIME : MAX_DQ_TIME;
630
631                                 rc = lquota_disk_write_glb(env, obj, 0, rec);
632                                 if (rc) {
633                                         CERROR("%s: failed to set default grace time for %s type: rc = %d\n",
634                                                qmt->qmt_svname, qtype_name(qtype), rc);
635                                         RETURN(rc);
636                                 }
637
638                                 rc = lquota_disk_update_ver(env, dev, obj, 1);
639                                 if (rc) {
640                                         CERROR("%s: failed to set initial version for %s type: rc = %d\n",
641                                                qmt->qmt_svname, qtype_name(qtype), rc);
642                                         RETURN(rc);
643                                 }
644                         }
645
646                         /* create quota entry site for this quota type */
647                         pool->qpi_site[qtype] = lquota_site_alloc(env, pool,
648                                                                   true, qtype,
649                                                                   &qmt_lqe_ops);
650                         if (IS_ERR(pool->qpi_site[qtype])) {
651                                 rc = PTR_ERR(pool->qpi_site[qtype]);
652                                 CERROR("%s: failed to create site for %s type: rc = %d\n",
653                                        qmt->qmt_svname, qtype_name(qtype), rc);
654                                 RETURN(rc);
655                         }
656
657                         /* count number of slaves which already connected to
658                          * the master in the past */
659                         for (i = 0; i < QMT_STYPE_CNT; i++)
660                                 pool->qpi_slv_nr[i][qtype] = 0;
661
662                         rc = lquota_disk_for_each_slv(env, pool->qpi_root,
663                                                       &qti->qti_fid,
664                                                       qmt_slv_add,
665                                                       pool);
666                         if (rc) {
667                                 CERROR("%s: failed to scan & count slave indexes for %s type: rc = %d\n",
668                                        qmt->qmt_svname, qtype_name(qtype), rc);
669                                 RETURN(rc);
670                         }
671
672                         /* Global grace time is stored in quota settings of
673                          * ID 0. */
674                         qti->qti_id.qid_uid = 0;
675
676                         /* look-up quota entry storing grace time */
677                         lqe = lqe_locate(env, pool->qpi_site[qtype],
678                                          &qti->qti_id);
679                         if (IS_ERR(lqe))
680                                 RETURN(PTR_ERR(lqe));
681                         pool->qpi_grace_lqe[qtype] = lqe;
682 #ifdef CONFIG_PROC_FS
683                         /* add procfs file to dump the global index, mostly for
684                          * debugging purpose */
685                         snprintf(qti->qti_buf, MTI_NAME_MAXLEN,
686                                  "glb-%s", qtype_name(qtype));
687                         rc = lprocfs_seq_create(pool->qpi_proc, qti->qti_buf,
688                                                 0444, &lprocfs_quota_seq_fops,
689                                                 obj);
690                         if (rc)
691                                 CWARN("%s: Error adding procfs file for global quota index "DFID": rc = %d\n",
692                                       qmt->qmt_svname, PFID(&qti->qti_fid), rc);
693 #endif
694                 }
695                 set_bit(QPI_FLAG_STATE_INITED, &pool->qpi_flags);
696                 if (name)
697                         break;
698         }
699
700         RETURN(0);
701 }
702
703 static int qmt_lgd_extend_cb(struct cfs_hash *hs, struct cfs_hash_bd *bd,
704                               struct hlist_node *hnode, void *data)
705 {
706         struct lqe_glbl_entry *lqeg_arr, *old_lqeg_arr;
707         struct lquota_entry *lqe;
708         int old_num, rc;
709
710         lqe = hlist_entry(hnode, struct lquota_entry, lqe_hash);
711         LASSERT(atomic_read(&lqe->lqe_ref) > 0);
712         rc = 0;
713
714         CDEBUG(D_QUOTA, "lgd %px\n", lqe->lqe_glbl_data);
715         if (lqe->lqe_glbl_data) {
716                 struct lqe_glbl_data *lgd;
717
718                 old_lqeg_arr = NULL;
719                 mutex_lock(&lqe->lqe_glbl_data_lock);
720                 if (lqe->lqe_glbl_data) {
721                         struct qmt_pool_info *qpi = lqe2qpi(lqe);
722                         int sarr_cnt = qmt_sarr_count(qpi);
723
724                         lgd = lqe->lqe_glbl_data;
725                         if (lgd->lqeg_num_alloc < sarr_cnt) {
726                                 LASSERT((lgd->lqeg_num_alloc + 1) == sarr_cnt);
727
728                                 OBD_ALLOC(lqeg_arr,
729                                           sizeof(struct lqe_glbl_entry) *
730                                           (lgd->lqeg_num_alloc + 16));
731                                 if (lqeg_arr) {
732                                         memcpy(lqeg_arr, lgd->lqeg_arr,
733                                                sizeof(struct lqe_glbl_entry) *
734                                                (lgd->lqeg_num_alloc));
735                                         old_lqeg_arr = lgd->lqeg_arr;
736                                         old_num = lgd->lqeg_num_alloc;
737                                         lgd->lqeg_arr = lqeg_arr;
738                                         lgd->lqeg_num_alloc += 16;
739                                         CDEBUG(D_QUOTA,
740                                                "extend lqeg_arr:%px from %d to %d\n",
741                                                lgd, old_num,
742                                                lgd->lqeg_num_alloc);
743                                 } else {
744                                         CERROR("%s: cannot allocate new lqeg_arr: rc = %d\n",
745                                                qpi->qpi_qmt->qmt_svname,
746                                                -ENOMEM);
747                                         GOTO(out, rc = -ENOMEM);
748                                 }
749                         }
750                         lgd->lqeg_arr[lgd->lqeg_num_used].lge_idx =
751                                 qmt_sarr_get_idx(qpi, sarr_cnt - 1);
752                         lgd->lqeg_arr[lgd->lqeg_num_used].lge_edquot =
753                                 lqe->lqe_edquot;
754                         lgd->lqeg_arr[lgd->lqeg_num_used].lge_qunit =
755                                 lqe->lqe_qunit;
756                         lgd->lqeg_arr[lgd->lqeg_num_used].lge_edquot_nu = 0;
757                         lgd->lqeg_arr[lgd->lqeg_num_used].lge_qunit_nu = 0;
758                         LQUOTA_DEBUG(lqe, "add tgt idx:%d used %d alloc %d\n",
759                                      lgd->lqeg_arr[lgd->lqeg_num_used].lge_idx,
760                                      lgd->lqeg_num_used, lgd->lqeg_num_alloc);
761                         lgd->lqeg_num_used++;
762                 }
763 out:
764                 mutex_unlock(&lqe->lqe_glbl_data_lock);
765                 OBD_FREE(old_lqeg_arr, old_num * sizeof(struct lqe_glbl_entry));
766         }
767
768         return rc;
769 }
770
771 /*
772  * Handle new slave connection. Called when a slave enqueues the global quota
773  * lock at the beginning of the reintegration procedure.
774  *
775  * \param env - is the environment passed by the caller
776  * \parap qmt - is the quota master target handling this request
777  * \param glb_fid - is the fid of the global index file
778  * \param slv_fid - is the fid of the newly created slave index file
779  * \param slv_ver - is the current version of the slave index file
780  * \param uuid    - is the uuid of slave which is (re)connecting to the master
781  *                  target
782  *
783  * \retval - 0 on success, appropriate error on failure
784  */
785 int qmt_pool_new_conn(const struct lu_env *env, struct qmt_device *qmt,
786                       struct lu_fid *glb_fid, struct lu_fid *slv_fid,
787                       __u64 *slv_ver, struct obd_uuid *uuid)
788 {
789         struct qmt_pool_info    *pool;
790         struct dt_object        *slv_obj;
791         int                      pool_type, qtype, stype;
792         bool                     created = false;
793         int                      idx, i, rc = 0;
794
795         stype = qmt_uuid2idx(uuid, &idx);
796         if (stype < 0)
797                 RETURN(stype);
798         CDEBUG(D_QUOTA, "FID "DFID"\n", PFID(glb_fid));
799
800         /* extract pool info from global index FID */
801         rc = lquota_extract_fid(glb_fid, &pool_type, &qtype);
802         if (rc)
803                 RETURN(rc);
804
805         pool = qmt_pool_lookup_glb(env, qmt, pool_type);
806         if (IS_ERR(pool))
807                 RETURN(PTR_ERR(pool));
808
809         /* look-up slave index file */
810         slv_obj = lquota_disk_slv_find(env, qmt->qmt_child, pool->qpi_root,
811                                        glb_fid, uuid);
812         if (IS_ERR(slv_obj) && PTR_ERR(slv_obj) == -ENOENT) {
813                 /* create slave index file */
814                 slv_obj = lquota_disk_slv_find_create(env, qmt->qmt_child,
815                                                       pool->qpi_root, glb_fid,
816                                                       uuid, false);
817                 created = true;
818         }
819         if (IS_ERR(slv_obj)) {
820                 rc = PTR_ERR(slv_obj);
821                 CERROR("%s: failed to create quota slave index file for %s (%d)"
822                        "\n", qmt->qmt_svname, obd_uuid2str(uuid), rc);
823                 GOTO(out, rc);
824         }
825
826         /* retrieve slave fid & current object version */
827         memcpy(slv_fid, lu_object_fid(&slv_obj->do_lu), sizeof(*slv_fid));
828         *slv_ver = dt_version_get(env, slv_obj);
829         dt_object_put(env, slv_obj);
830         if (created) {
831                 struct qmt_pool_info *ptr;
832
833                 CDEBUG(D_QUOTA, "add tgt idx:%d pool_type:%d qtype:%d stype:%d\n",
834                        idx, pool_type, qtype, stype);
835
836                 if (!qmt_dom(qtype, stype)) {
837                         qmt_sarr_write_down(pool);
838                         rc = qmt_sarr_pool_add_locked(pool, idx, stype);
839                         if (!rc) {
840                                 for (i = 0; i < LL_MAXQUOTAS; i++)
841                                         cfs_hash_for_each(pool->qpi_site[i]->
842                                                           lqs_hash,
843                                                           qmt_lgd_extend_cb,
844                                                           &env);
845                         } else if (rc == -EEXIST) {
846                                 /* This target has been already added
847                                  * by another qtype
848                                  */
849                                 rc = 0;
850                         }
851                         qmt_sarr_write_up(pool);
852
853                         if (rc) {
854                                 CERROR("%s: cannot add idx:%d to pool %s: rc = %d\n",
855                                        qmt->qmt_svname, idx,
856                                        pool->qpi_name, rc);
857                                 GOTO(out, rc);
858                         }
859                 }
860
861                 /* look-up pool in charge of this global index FID */
862                 qti_pools_init(env);
863                 ptr = qmt_pool_lookup_arr(env, qmt, pool_type, idx, stype);
864                 if (IS_ERR(ptr))
865                         GOTO(out, rc = PTR_ERR(ptr));
866
867                 for (i = 0; i < qti_pools_cnt(env); i++)
868                         qti_pools_env(env)[i]->qpi_slv_nr[stype][qtype]++;
869
870                 qti_pools_fini(env);
871         }
872
873 out:
874         qpi_putref(env, pool);
875         RETURN(rc);
876 }
877
878 /*
879  * Look-up a lquota_entry in the pool hash and allocate it if not found.
880  *
881  * \param env - is the environment passed by the caller
882  * \param qmt - is the quota master target for which we have to initialize the
883  *              pool configuration
884  * \param pool_type - is the pool type, either LQUOTA_RES_MD or LQUOTA_RES_DT.
885  * \param qtype     - is the quota type, either user or group.
886  * \param qid       - is the quota ID to look-up
887  *
888  * \retval - valid pointer to lquota entry on success, appropriate error on
889  *           failure
890  */
891 struct lquota_entry *qmt_pool_lqe_lookup(const struct lu_env *env,
892                                          struct qmt_device *qmt,
893                                          int pool_type, int qtype,
894                                          union lquota_id *qid,
895                                          char *pool_name)
896 {
897         struct qmt_pool_info    *pool;
898         struct lquota_entry     *lqe;
899         ENTRY;
900
901         /* look-up pool responsible for this global index FID */
902         pool = qmt_pool_lookup_name(env, qmt, pool_type, pool_name);
903         if (IS_ERR(pool))
904                 RETURN(ERR_CAST(pool));
905
906         if (qid->qid_uid == 0) {
907                 /* caller wants to access grace time, no need to look up the
908                  * entry since we keep a reference on ID 0 all the time */
909                 lqe = pool->qpi_grace_lqe[qtype];
910                 lqe_getref(lqe);
911                 GOTO(out, lqe);
912         }
913
914         /* now that we have the pool, let's look-up the quota entry in the
915          * right quota site */
916         lqe = lqe_locate(env, pool->qpi_site[qtype], qid);
917 out:
918         qpi_putref(env, pool);
919         RETURN(lqe);
920 }
921
922 int qmt_pool_lqes_lookup(const struct lu_env *env,
923                          struct qmt_device *qmt,
924                          int rtype, int stype,
925                          int qtype, union lquota_id *qid,
926                          char *pool_name, int idx)
927 {
928         struct qmt_pool_info    *pool;
929         struct lquota_entry     *lqe;
930         int rc, i;
931         ENTRY;
932
933         qti_pools_init(env);
934         rc = 0;
935         /* look-up pool responsible for this global index FID */
936         pool = qmt_pool_lookup_arr(env, qmt, rtype, idx, stype);
937         if (IS_ERR(pool)) {
938                 qti_pools_fini(env);
939                 RETURN(PTR_ERR(pool));
940         }
941
942         /* now that we have the pool, let's look-up the quota entry in the
943          * right quota site */
944         qti_lqes_init(env);
945         for (i = 0; i < qti_pools_cnt(env); i++) {
946                 pool = qti_pools_env(env)[i];
947                 lqe = lqe_locate(env, pool->qpi_site[qtype], qid);
948                 if (IS_ERR(lqe)) {
949                         qti_lqes_fini(env);
950                         GOTO(out, rc = PTR_ERR(lqe));
951                 }
952                 qti_lqes_add(env, lqe);
953         }
954         LASSERT(qti_lqes_glbl(env)->lqe_is_global);
955
956 out:
957         qti_pools_fini(env);
958         RETURN(rc);
959 }
960
961 static int lqes_cmp(const void *arg1, const void *arg2)
962 {
963         const struct lquota_entry *lqe1, *lqe2;
964
965         lqe1 = *(const struct lquota_entry **)arg1;
966         lqe2 = *(const struct lquota_entry **)arg2;
967         if (lqe1->lqe_qunit > lqe2->lqe_qunit)
968                 return 1;
969         if (lqe1->lqe_qunit < lqe2->lqe_qunit)
970                 return -1;
971         return 0;
972 }
973
974 void qmt_lqes_sort(const struct lu_env *env)
975 {
976         sort(qti_lqes(env), qti_lqes_cnt(env), sizeof(void *), lqes_cmp, NULL);
977         /* global lqe was moved during sorting */
978         if (!qti_lqes_glbl(env)->lqe_is_global) {
979                 int i;
980                 for (i = 0; i < qti_lqes_cnt(env); i++) {
981                         if (qti_lqes(env)[i]->lqe_is_global) {
982                                 qti_glbl_lqe_idx(env) = i;
983                                 break;
984                         }
985                 }
986         }
987 }
988
989 int qmt_pool_lqes_lookup_spec(const struct lu_env *env, struct qmt_device *qmt,
990                               int rtype, int qtype, union lquota_id *qid)
991 {
992         struct qmt_pool_info    *pos;
993         struct lquota_entry     *lqe;
994
995         qti_lqes_init(env);
996         down_read(&qmt->qmt_pool_lock);
997         if (list_empty(&qmt->qmt_pool_list)) {
998                 up_read(&qmt->qmt_pool_lock);
999                 RETURN(-ENOENT);
1000         }
1001
1002         list_for_each_entry(pos, &qmt->qmt_pool_list, qpi_linkage) {
1003                 if (pos->qpi_rtype != rtype)
1004                         continue;
1005                 /* Don't take into account pools without slaves */
1006                 if (!qpi_slv_nr(pos, qtype))
1007                         continue;
1008                 lqe = lqe_find(env, pos->qpi_site[qtype], qid);
1009                 /* ENOENT is valid case for lqe from non global pool
1010                  * that hasn't limits, i.e. not enforced. Continue even
1011                  * in case of error - we can handle already found lqes */
1012                 if (IS_ERR(lqe))
1013                         continue;
1014                 if (!lqe->lqe_enforced) {
1015                         /* no settings for this qid_uid */
1016                         lqe_putref(lqe);
1017                         continue;
1018                 }
1019                 qti_lqes_add(env, lqe);
1020                 CDEBUG(D_QUOTA, "adding lqe %px from pool %s\n",
1021                                  lqe, pos->qpi_name);
1022         }
1023         up_read(&qmt->qmt_pool_lock);
1024         RETURN(qti_lqes_cnt(env) ? 0 : -ENOENT);
1025 }
1026
1027 /**
1028  * Allocate a new pool for the specified device.
1029  *
1030  * Allocate a new pool_desc structure for the specified \a new_pool
1031  * device to create a pool with the given \a poolname.  The new pool
1032  * structure is created with a single reference, and is freed when the
1033  * reference count drops to zero.
1034  *
1035  * \param[in] obd       Lustre OBD device on which to add a pool iterator
1036  * \param[in] poolname  the name of the pool to be created
1037  *
1038  * \retval              0 in case of success
1039  * \retval              negative error code in case of error
1040  */
1041 int qmt_pool_new(struct obd_device *obd, char *poolname)
1042 {
1043         struct qmt_device       *qmt = lu2qmt_dev(obd->obd_lu_dev);
1044         struct qmt_pool_info *qpi;
1045         struct lu_env env;
1046         int rc;
1047         ENTRY;
1048
1049         if (strnlen(poolname, LOV_MAXPOOLNAME + 1) > LOV_MAXPOOLNAME)
1050                 RETURN(-ENAMETOOLONG);
1051
1052         rc = lu_env_init(&env, LCT_MD_THREAD);
1053         if (rc) {
1054                 CERROR("%s: can't init env: rc = %d\n", obd->obd_name, rc);
1055                 RETURN(rc);
1056         }
1057
1058         qpi = qmt_pool_lookup_name(&env, qmt, LQUOTA_RES_DT, poolname);
1059         if (!IS_ERR(qpi)) {
1060                 /* Valid case when several MDTs are mounted
1061                  * at the same node. */
1062                 CDEBUG(D_QUOTA, "pool %s already exists\n", poolname);
1063                 qpi_putref(&env, qpi);
1064                 GOTO(out_env, rc = -EEXIST);
1065         }
1066         if (PTR_ERR(qpi) != -ENOENT) {
1067                 CWARN("%s: pool %s lookup failed: rc = %ld\n",
1068                       obd->obd_name, poolname, PTR_ERR(qpi));
1069                 GOTO(out_env, rc = PTR_ERR(qpi));
1070         }
1071
1072         /* Now allocate and prepare only DATA pool.
1073          * Further when MDT pools will be ready we need to add
1074          * a cycle here and setup pools of both types. Another
1075          * approach is to find out pool of which type should be
1076          * created. */
1077         rc = qmt_pool_alloc(&env, qmt, poolname, LQUOTA_RES_DT);
1078         if (rc) {
1079                 CERROR("%s: can't alloc pool %s: rc = %d\n",
1080                        obd->obd_name, poolname, rc);
1081                 GOTO(out_env, rc);
1082         }
1083
1084         rc = qmt_pool_prepare(&env, qmt, qmt->qmt_root, poolname);
1085         if (rc) {
1086                 CERROR("%s: can't prepare pool for %s: rc = %d\n",
1087                        obd->obd_name, poolname, rc);
1088                 GOTO(out_err, rc);
1089         }
1090
1091         CDEBUG(D_QUOTA, "Quota pool "LOV_POOLNAMEF" added\n",
1092                poolname);
1093
1094         GOTO(out_env, rc);
1095 out_err:
1096         qpi = qmt_pool_lookup_name(&env, qmt, LQUOTA_RES_DT, poolname);
1097         if (!IS_ERR(qpi)) {
1098                 qpi_putref(&env, qpi);
1099                 qpi_putref(&env, qpi);
1100         }
1101 out_env:
1102         lu_env_fini(&env);
1103         return rc;
1104 }
1105
1106 static int
1107 qmt_obj_recalc(const struct lu_env *env, struct dt_object *obj,
1108                struct lquota_site *site)
1109 {
1110         struct qmt_thread_info *qti = qmt_info(env);
1111         union lquota_id *qid = &qti->qti_id;
1112         const struct dt_it_ops *iops;
1113         struct dt_key *key;
1114         struct dt_it *it;
1115         __u64 granted;
1116         int rc;
1117         ENTRY;
1118
1119         iops = &obj->do_index_ops->dio_it;
1120
1121         it = iops->init(env, obj, 0);
1122         if (IS_ERR(it)) {
1123                 CWARN("quota: initialize it for "DFID" failed: rc = %ld\n",
1124                       PFID(&qti->qti_fid), PTR_ERR(it));
1125                 RETURN(PTR_ERR(it));
1126         }
1127
1128         rc = iops->load(env, it, 0);
1129         if (rc < 0) {
1130                 CWARN("quota: load first entry for "DFID" failed: rc = %d\n",
1131                       PFID(&qti->qti_fid), rc);
1132                 GOTO(out, rc);
1133         } else if (rc == 0) {
1134                 rc = iops->next(env, it);
1135                 if (rc != 0)
1136                         GOTO(out, rc = (rc < 0) ? rc : 0);
1137         }
1138
1139         do {
1140                 struct lquota_entry *lqe;
1141
1142                 key = iops->key(env, it);
1143                 if (IS_ERR(key)) {
1144                         CWARN("quota: error key for "DFID": rc = %ld\n",
1145                               PFID(&qti->qti_fid), PTR_ERR(key));
1146                         GOTO(out, rc = PTR_ERR(key));
1147                 }
1148
1149                 /* skip the root user/group */
1150                 if (*((__u64 *)key) == 0)
1151                         goto next;
1152
1153                 qid->qid_uid = *((__u64 *)key);
1154
1155                 rc = qmt_slv_read(env, qid, obj, &granted);
1156                 if (!granted)
1157                         goto next;
1158
1159                 lqe = lqe_locate(env, site, qid);
1160                 if (IS_ERR(lqe))
1161                         GOTO(out, rc = PTR_ERR(lqe));
1162                 lqe_write_lock(lqe);
1163                 lqe->lqe_recalc_granted += granted;
1164                 lqe_write_unlock(lqe);
1165                 lqe_putref(lqe);
1166 next:
1167                 rc = iops->next(env, it);
1168                 if (rc < 0)
1169                         CWARN("quota: failed to parse index "DFID
1170                               ", ->next error: rc = %d\n",
1171                               PFID(&qti->qti_fid), rc);
1172         } while (rc == 0 && !kthread_should_stop());
1173
1174 out:
1175         iops->put(env, it);
1176         iops->fini(env, it);
1177         RETURN(rc);
1178 }
1179
1180 static int qmt_site_recalc_cb(struct cfs_hash *hs, struct cfs_hash_bd *bd,
1181                               struct hlist_node *hnode, void *data)
1182 {
1183         struct lquota_entry     *lqe;
1184         struct lu_env *env = data;
1185
1186         lqe = hlist_entry(hnode, struct lquota_entry, lqe_hash);
1187         LASSERT(atomic_read(&lqe->lqe_ref) > 0);
1188
1189         lqe_write_lock(lqe);
1190         if (lqe->lqe_granted != lqe->lqe_recalc_granted) {
1191                 struct qmt_device *qmt = lqe2qpi(lqe)->qpi_qmt;
1192                 struct thandle *th;
1193                 bool need_notify = false;
1194                 int rc;
1195
1196                 LQUOTA_DEBUG(lqe, "lqe_recalc_granted %llu\n",
1197                              lqe->lqe_recalc_granted);
1198                 lqe->lqe_granted = lqe->lqe_recalc_granted;
1199                 /* Always returns true, if there is no slaves in a pool */
1200                 need_notify |= qmt_adjust_qunit(env, lqe);
1201                 need_notify |= qmt_adjust_edquot(lqe, ktime_get_real_seconds());
1202                 if (need_notify) {
1203                         /* Find all lqes with lqe_id to reseed lgd array */
1204                         rc = qmt_pool_lqes_lookup_spec(env, qmt, lqe_rtype(lqe),
1205                                                 lqe_qtype(lqe), &lqe->lqe_id);
1206                         if (!rc) {
1207                                 struct lquota_entry *lqeg = qti_lqes_glbl(env);
1208
1209                                 mutex_lock(&lqeg->lqe_glbl_data_lock);
1210                                 if (lqeg->lqe_glbl_data &&
1211                                     qti_lqes_cnt(env) > 0)
1212                                         qmt_seed_glbe(env, lqeg->lqe_glbl_data,
1213                                                       true);
1214                                 mutex_unlock(&lqeg->lqe_glbl_data_lock);
1215                                 qmt_id_lock_notify(qmt, lqeg);
1216                         }
1217                         qti_lqes_fini(env);
1218                 }
1219                 th = dt_trans_create(env, qmt->qmt_child);
1220                 if (IS_ERR(th))
1221                         goto out;
1222
1223                 rc = lquota_disk_declare_write(env, th,
1224                                                LQE_GLB_OBJ(lqe),
1225                                                &lqe->lqe_id);
1226                 if (rc)
1227                         GOTO(out_stop, rc);
1228
1229                 rc = dt_trans_start_local(env, qmt->qmt_child, th);
1230                 if (rc)
1231                         GOTO(out_stop, rc);
1232
1233                 qmt_glb_write(env, th, lqe, 0, NULL);
1234 out_stop:
1235                 dt_trans_stop(env, qmt->qmt_child, th);
1236         }
1237 out:
1238         lqe->lqe_recalc_granted = 0;
1239         lqe_write_unlock(lqe);
1240
1241         return 0;
1242 }
1243
1244 #define MDT_DEV_NAME_LEN (LUSTRE_MAXFSNAME + sizeof("-MDT0000"))
1245 static struct obd_device *qmt_get_mgc(struct qmt_device *qmt)
1246 {
1247         char mdt_name[MDT_DEV_NAME_LEN];
1248         struct lustre_mount_info *lmi;
1249         struct obd_device *obd;
1250         int rc;
1251         ENTRY;
1252
1253         rc = server_name2fsname(qmt->qmt_svname, mdt_name, NULL);
1254         if (rc) {
1255                 CERROR("quota: cannot get server name from %s: rc = %d\n",
1256                        qmt->qmt_svname, rc);
1257                 RETURN(ERR_PTR(rc));
1258         }
1259
1260         strlcat(mdt_name, "-MDT0000", MDT_DEV_NAME_LEN);
1261         lmi = server_get_mount(mdt_name);
1262         if (lmi == NULL) {
1263                 rc = -ENOENT;
1264                 CERROR("%s: cannot get mount info from %s: rc = %d\n",
1265                        qmt->qmt_svname, mdt_name, rc);
1266                 RETURN(ERR_PTR(rc));
1267         }
1268         obd = s2lsi(lmi->lmi_sb)->lsi_mgc;
1269         lustre_put_lsi(lmi->lmi_sb);
1270
1271         RETURN(obd);
1272 }
1273
1274 static int qmt_pool_recalc(void *args)
1275 {
1276         struct qmt_pool_info *pool, *glbl_pool;
1277         struct obd_device *obd;
1278         struct lu_env env;
1279         int i, rc, qtype, slaves_cnt;
1280         bool sem = false;
1281         ENTRY;
1282
1283         pool = args;
1284
1285         rc = lu_env_init(&env, LCT_MD_THREAD);
1286         if (rc) {
1287                 CERROR("%s: cannot init env: rc = %d\n",
1288                        pool->qpi_qmt->qmt_svname, rc);
1289                 GOTO(out, rc);
1290         }
1291
1292         obd = qmt_get_mgc(pool->qpi_qmt);
1293         if (IS_ERR(obd))
1294                 GOTO(out, rc = PTR_ERR(obd));
1295
1296         /* Waiting for the end of processing mgs config.
1297          * It is needed to be sure all pools are configured.
1298          */
1299         while (obd->obd_process_conf)
1300                 schedule_timeout_uninterruptible(cfs_time_seconds(1));
1301
1302         CFS_FAIL_TIMEOUT(OBD_FAIL_QUOTA_RECALC, cfs_fail_val);
1303         qmt_sarr_read_down(pool);
1304         sem = true;
1305         /* Hold this to be sure that OSTs from this pool
1306          * can't do acquire/release.
1307          *
1308          * I guess below write semaphore could be a bottleneck
1309          * as qmt_dqacq would be blocked trying to hold
1310          * read_lock at qmt_pool_lookup->qti_pools_add.
1311          * But on the other hand adding/removing OSTs to the pool is
1312          * a rare operation. If finally this would be a problem,
1313          * we can consider another approach. For example we can
1314          * iterate through the POOL's lqes. Take lqe, hold lqe_write_lock
1315          * and go through appropriate OSTs. I don't use this approach now
1316          * as newly created pool hasn't lqes entries. So firstly we need
1317          * to get this lqes from the global pool index file. This
1318          * solution looks more complex, so leave it as it is. */
1319         down_write(&pool->qpi_recalc_sem);
1320
1321         glbl_pool = qmt_pool_lookup_glb(&env, pool->qpi_qmt, pool->qpi_rtype);
1322         if (IS_ERR(glbl_pool))
1323                 GOTO(out, rc = PTR_ERR(glbl_pool));
1324
1325         slaves_cnt = qmt_sarr_count(pool);
1326         CDEBUG(D_QUOTA, "Starting pool recalculation for %d slaves in %s\n",
1327                slaves_cnt, pool->qpi_name);
1328
1329         for (qtype = 0; qtype < LL_MAXQUOTAS; qtype++) {
1330                 for (i = 0; i < slaves_cnt; i++) {
1331                         struct qmt_thread_info  *qti = qmt_info(&env);
1332                         struct dt_object *slv_obj;
1333                         struct obd_uuid uuid;
1334                         int idx;
1335
1336                         if (kthread_should_stop())
1337                                 GOTO(out_stop, rc = 0);
1338                         idx = qmt_sarr_get_idx(pool, i);
1339                         LASSERT(idx >= 0);
1340
1341                         /* We don't need fsname here - anyway
1342                          * lquota_disk_slv_filename ignores it. */
1343                         snprintf(uuid.uuid, UUID_MAX, "-OST%04x_UUID", idx);
1344                         lquota_generate_fid(&qti->qti_fid, pool->qpi_rtype,
1345                                             qtype);
1346                         /* look-up index file associated with acquiring slave */
1347                         slv_obj = lquota_disk_slv_find(&env,
1348                                                 glbl_pool->qpi_qmt->qmt_child,
1349                                                 glbl_pool->qpi_root,
1350                                                 &qti->qti_fid,
1351                                                 &uuid);
1352                         if (IS_ERR(slv_obj))
1353                                 GOTO(out_stop, rc = PTR_ERR(slv_obj));
1354
1355                         CDEBUG(D_QUOTA, "slv_obj is found %p for uuid %s\n",
1356                                slv_obj, uuid.uuid);
1357                         qmt_obj_recalc(&env, slv_obj, pool->qpi_site[qtype]);
1358                         dt_object_put(&env, slv_obj);
1359                 }
1360                 /* Now go trough the site hash and compare lqe_granted
1361                  * with lqe_calc_granted. Write new value if disagree */
1362
1363                 cfs_hash_for_each(pool->qpi_site[qtype]->lqs_hash,
1364                                   qmt_site_recalc_cb, &env);
1365         }
1366         GOTO(out_stop, rc);
1367 out_stop:
1368         qpi_putref(&env, glbl_pool);
1369 out:
1370         if (xchg(&pool->qpi_recalc_task, NULL) == NULL)
1371                 /*
1372                  * Someone is waiting for us to stop - be sure not to exit
1373                  * before kthread_stop() gets a ref on the task.  No event
1374                  * will happen on 'pool, this is just a convenient way to
1375                  * wait.
1376                  */
1377                 wait_var_event(pool, kthread_should_stop());
1378
1379         clear_bit(QPI_FLAG_RECALC_OFFSET, &pool->qpi_flags);
1380         /* Pool can't be changed, since sem has been down.
1381          * Thus until up_read, no one can restart recalc thread.
1382          */
1383         if (sem) {
1384                 qmt_sarr_read_up(pool);
1385                 up_write(&pool->qpi_recalc_sem);
1386         }
1387
1388         /* qpi_getref has been called in qmt_start_pool_recalc,
1389          * however we can't call qpi_putref if lu_env_init failed.
1390          */
1391         if (env.le_ctx.lc_state == LCS_ENTERED) {
1392                 qpi_putref(&env, pool);
1393                 lu_env_fini(&env);
1394         }
1395
1396         return rc;
1397 }
1398
1399 static int qmt_start_pool_recalc(struct lu_env *env, struct qmt_pool_info *qpi)
1400 {
1401         struct task_struct *task;
1402         int rc = 0;
1403
1404         if (!test_and_set_bit(QPI_FLAG_RECALC_OFFSET, &qpi->qpi_flags)) {
1405                 LASSERT(!qpi->qpi_recalc_task);
1406
1407                 qpi_getref(qpi);
1408                 task = kthread_create(qmt_pool_recalc, qpi,
1409                                       "qsd_reint_%s", qpi->qpi_name);
1410                 if (IS_ERR(task)) {
1411                         clear_bit(QPI_FLAG_RECALC_OFFSET, &qpi->qpi_flags);
1412                         rc = PTR_ERR(task);
1413                         qpi_putref(env, qpi);
1414                 } else {
1415                         qpi->qpi_recalc_task = task;
1416                         /* Using park/unpark to start the thread ensures that
1417                          * the thread function does get calls, so the
1418                          * ref on qpi will be dropped
1419                          */
1420                         kthread_park(task);
1421                         kthread_unpark(task);
1422                 }
1423         }
1424
1425         RETURN(rc);
1426 }
1427
1428 static inline void qmt_stop_pool_recalc(struct qmt_pool_info *qpi)
1429 {
1430         struct task_struct *task;
1431
1432         task = xchg(&qpi->qpi_recalc_task, NULL);
1433         if (task)
1434                 kthread_stop(task);
1435 }
1436
1437 static int qmt_pool_slv_nr_change(const struct lu_env *env,
1438                                   struct qmt_pool_info *pool,
1439                                   int idx, bool add)
1440 {
1441         struct qmt_pool_info *glbl_pool;
1442         int qtype;
1443
1444         glbl_pool = qmt_pool_lookup_glb(env, pool->qpi_qmt, LQUOTA_RES_DT);
1445         if (IS_ERR(glbl_pool))
1446                 RETURN(PTR_ERR(glbl_pool));
1447
1448         for (qtype = 0; qtype < LL_MAXQUOTAS; qtype++) {
1449                 struct qmt_thread_info  *qti = qmt_info(env);
1450                 struct dt_object *slv_obj;
1451                 struct obd_uuid uuid;
1452
1453                 /* We don't need fsname here - anyway
1454                  * lquota_disk_slv_filename ignores it. */
1455                 snprintf(uuid.uuid, UUID_MAX, "-OST%04x_UUID", idx);
1456                 lquota_generate_fid(&qti->qti_fid, pool->qpi_rtype,
1457                                     qtype);
1458                 /* look-up index file associated with acquiring slave */
1459                 slv_obj = lquota_disk_slv_find(env,
1460                                         glbl_pool->qpi_qmt->qmt_child,
1461                                         glbl_pool->qpi_root,
1462                                         &qti->qti_fid,
1463                                         &uuid);
1464                 if (IS_ERR(slv_obj))
1465                         continue;
1466
1467                 if (add)
1468                         pool->qpi_slv_nr[QMT_STYPE_OST][qtype]++;
1469                 else
1470                         pool->qpi_slv_nr[QMT_STYPE_OST][qtype]--;
1471                 dt_object_put(env, slv_obj);
1472         }
1473         qpi_putref(env, glbl_pool);
1474
1475         return 0;
1476 }
1477
1478 static int qmt_pool_add_rem(struct obd_device *obd, char *poolname,
1479                             char *slavename, bool add)
1480 {
1481         struct qmt_device       *qmt = lu2qmt_dev(obd->obd_lu_dev);
1482         struct qmt_pool_info    *qpi;
1483         struct lu_env            env;
1484         int                      rc, idx;
1485         ENTRY;
1486
1487         if (qmt->qmt_stopping)
1488                 RETURN(0);
1489
1490         if (strnlen(poolname, LOV_MAXPOOLNAME + 1) > LOV_MAXPOOLNAME)
1491                 RETURN(-ENAMETOOLONG);
1492
1493         CDEBUG(D_QUOTA, add ? "%s: pool %s, adding %s\n" :
1494                               "%s: pool %s, removing %s\n",
1495               obd->obd_name, poolname, slavename);
1496
1497         rc = server_name2index(slavename, &idx, NULL);
1498         if (rc != LDD_F_SV_TYPE_OST)
1499                 RETURN(-EINVAL);
1500
1501         rc = lu_env_init(&env, LCT_MD_THREAD);
1502         if (rc) {
1503                 CERROR("%s: cannot init env: rc = %d\n", obd->obd_name, rc);
1504                 RETURN(rc);
1505         }
1506
1507         qpi = qmt_pool_lookup_name(&env, qmt, LQUOTA_RES_DT, poolname);
1508         if (IS_ERR(qpi)) {
1509                 CWARN("%s: can't find pool %s: rc = %long\n",
1510                       obd->obd_name, poolname, PTR_ERR(qpi));
1511                 GOTO(out, rc = PTR_ERR(qpi));
1512         }
1513
1514         rc = add ? qmt_sarr_pool_add(qpi, idx, QMT_STYPE_OST) :
1515                    qmt_sarr_pool_rem(qpi, idx);
1516         if (rc) {
1517                 /* message is checked in sanity-quota test_1b */
1518                 CERROR("%s: can't %s %s pool '%s': rc = %d\n",
1519                        obd->obd_name, add ? "add to" : "remove", slavename,
1520                        poolname, rc);
1521                 GOTO(out_putref, rc);
1522         }
1523         qmt_pool_slv_nr_change(&env, qpi, idx, add);
1524         qmt_start_pool_recalc(&env, qpi);
1525
1526 out_putref:
1527         qpi_putref(&env, qpi);
1528 out:
1529         lu_env_fini(&env);
1530         RETURN(rc);
1531 }
1532
1533
1534
1535 /**
1536  * Add a single target device to the named pool.
1537  *
1538  * \param[in] obd       OBD device on which to add the pool
1539  * \param[in] poolname  name of the pool to which to add the target \a slavename
1540  * \param[in] slavename name of the target device to be added
1541  *
1542  * \retval              0 if \a slavename was (previously) added to the pool
1543  * \retval              negative error number on failure
1544  */
1545 int qmt_pool_add(struct obd_device *obd, char *poolname, char *slavename)
1546 {
1547         return qmt_pool_add_rem(obd, poolname, slavename, true);
1548 }
1549
1550 /**
1551  * Remove the named target from the specified pool.
1552  *
1553  * \param[in] obd       OBD device from which to remove \a poolname
1554  * \param[in] poolname  name of the pool to be changed
1555  * \param[in] slavename name of the target to remove from \a poolname
1556  *
1557  * \retval              0 on successfully removing \a slavename from the pool
1558  * \retval              negative number on error (e.g. \a slavename not in pool)
1559  */
1560 int qmt_pool_rem(struct obd_device *obd, char *poolname, char *slavename)
1561 {
1562         return qmt_pool_add_rem(obd, poolname, slavename, false);
1563 }
1564
1565 /**
1566  * Remove the named pool from the QMT device.
1567  *
1568  * \param[in] obd       OBD device on which pool was previously created
1569  * \param[in] poolname  name of pool to remove from \a obd
1570  *
1571  * \retval              0 on successfully removing the pool
1572  * \retval              negative error numbers for failures
1573  */
1574 int qmt_pool_del(struct obd_device *obd, char *poolname)
1575 {
1576         struct qmt_device       *qmt = lu2qmt_dev(obd->obd_lu_dev);
1577         struct qmt_pool_info    *qpi;
1578         struct lu_fid            fid;
1579         char                     buf[LQUOTA_NAME_MAX];
1580         struct lu_env            env;
1581         int                      rc;
1582         int                      qtype;
1583         ENTRY;
1584
1585         if (strnlen(poolname, LOV_MAXPOOLNAME + 1) > LOV_MAXPOOLNAME)
1586                 RETURN(-ENAMETOOLONG);
1587
1588         CDEBUG(D_QUOTA, "Removing quota pool "LOV_POOLNAMEF"\n",
1589                poolname);
1590
1591         rc = lu_env_init(&env, LCT_MD_THREAD);
1592         if (rc) {
1593                 CERROR("%s: cannot init env: rc = %d\n", obd->obd_name, rc);
1594                 RETURN(rc);
1595         }
1596
1597         /* look-up pool in charge of this global index FID */
1598         qpi = qmt_pool_lookup_name(&env, qmt, LQUOTA_RES_DT, poolname);
1599         if (IS_ERR(qpi)) {
1600                 /* Valid case for several MDTs at the same node -
1601                  * pool removed by the 1st MDT in config */
1602                 CDEBUG(D_QUOTA, "Cannot find pool %s\n", poolname);
1603                 lu_env_fini(&env);
1604                 RETURN(PTR_ERR(qpi));
1605         }
1606
1607         for (qtype = 0; qtype < LL_MAXQUOTAS; qtype++) {
1608                 lquota_generate_fid(&fid, LQUOTA_RES_DT, qtype);
1609                 snprintf(buf, LQUOTA_NAME_MAX, "0x%x", fid.f_oid);
1610                 rc = local_object_unlink(&env, qmt->qmt_child,
1611                                          qpi->qpi_root, buf);
1612                 if (rc)
1613                         CWARN("%s: cannot unlink %s from pool %s: rc = %d\n",
1614                               obd->obd_name, buf, poolname, rc);
1615         }
1616
1617         /* put ref from look-up */
1618         qpi_putref(&env, qpi);
1619         /* put last ref to free qpi */
1620         qpi_putref(&env, qpi);
1621
1622         snprintf(buf, LQUOTA_NAME_MAX, "%s-%s",
1623                  RES_NAME(LQUOTA_RES_DT), poolname);
1624         rc = local_object_unlink(&env, qmt->qmt_child, qmt->qmt_root, buf);
1625         if (rc)
1626                 CWARN("%s: cannot unlink dir %s: rc = %d\n",
1627                       obd->obd_name, poolname, rc);
1628
1629         lu_env_fini(&env);
1630         RETURN(0);
1631 }
1632
1633 static inline int qmt_sarr_pool_init(struct qmt_pool_info *qpi)
1634 {
1635         return lu_tgt_pool_init(&qpi->qpi_sarr.osts, 0);
1636 }
1637
1638 static inline int
1639 _qmt_sarr_pool_add(struct qmt_pool_info *qpi, int idx, int stype, bool locked)
1640 {
1641         /* We don't have an array for DOM */
1642         if (qmt_dom(qpi->qpi_rtype, stype))
1643                 return 0;
1644
1645         if (locked)
1646                 return lu_tgt_pool_add_locked(&qpi->qpi_sarr.osts, idx, 32);
1647         else
1648                 return lu_tgt_pool_add(&qpi->qpi_sarr.osts, idx, 32);
1649 }
1650
1651 static inline int qmt_sarr_pool_rem(struct qmt_pool_info *qpi, int idx)
1652 {
1653         return lu_tgt_pool_remove(&qpi->qpi_sarr.osts, idx);
1654 }
1655
1656 static inline void qmt_sarr_pool_free(struct qmt_pool_info *qpi)
1657 {
1658         if (qpi->qpi_sarr.osts.op_array)
1659                 lu_tgt_pool_free(&qpi->qpi_sarr.osts);
1660 }
1661
1662 static inline int qmt_sarr_check_idx(struct qmt_pool_info *qpi, int idx)
1663 {
1664         if (qmt_pool_global(qpi))
1665                 return 0;
1666
1667         return lu_tgt_check_index(idx, &qpi->qpi_sarr.osts);
1668 }
1669
1670 int qmt_sarr_get_idx(struct qmt_pool_info *qpi, int arr_idx)
1671 {
1672         LASSERTF(arr_idx < qpi->qpi_sarr.osts.op_count && arr_idx >= 0,
1673                  "idx invalid %d op_count %d\n", arr_idx,
1674                  qpi->qpi_sarr.osts.op_count);
1675         return qpi->qpi_sarr.osts.op_array[arr_idx];
1676 }
1677
1678 /* Number of slaves in a pool */
1679 unsigned int qmt_sarr_count(struct qmt_pool_info *qpi)
1680 {
1681         return qpi->qpi_sarr.osts.op_count;
1682 }