Whamcloud - gitweb
93d2d444d6fe852c36ab71791198c650ad86ec6e
[fs/lustre-release.git] / lustre / quota / qmt_pool.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 021110-1307, USA
20  *
21  * GPL HEADER END
22  */
23 /*
24  * Copyright (c) 2012, 2017, Intel Corporation.
25  * Use is subject to license terms.
26  *
27  * Author: Johann Lombardi <johann.lombardi@intel.com>
28  * Author: Niu    Yawei    <yawei.niu@intel.com>
29  */
30
31 /*
32  * A Quota Master Target has a list(qmt_pool_list) where it stores qmt_pool_info
33  * structures. There is one such structure for each pool managed by the QMT.
34  *
35  * Each pool can have different quota types enforced (typically user & group
36  * quota). A pool is in charge of managing lquota_entry structures for each
37  * quota type. This is done by creating one lquota_entry site per quota
38  * type. A site stores entries in a hash table and read quota settings from disk
39  * when a given ID isn't present in the hash.
40  *
41  * The pool API exported here is the following:
42  * - qmt_pool_init(): initializes the general QMT structures used to manage
43  *                    pools.
44  * - qmt_pool_fini(): frees the structures allocated by qmt_pool_fini().
45  * - qmt_pool_prepare(): sets up the on-disk indexes associated with each pool.
46  * - qmt_pool_new_conn(): is used to create a new slave index file.
47  * - qmt_pool_lqe_lookup(): returns an up-to-date lquota entry associated with
48  *                          a given ID.
49  */
50
51 #define DEBUG_SUBSYSTEM S_LQUOTA
52
53 #include <obd_class.h>
54 #include <lprocfs_status.h>
55 #include "qmt_internal.h"
56
57 static inline int qmt_sarr_pool_init(struct qmt_pool_info *qpi);
58 static inline int qmt_sarr_pool_add(struct qmt_pool_info *qpi,
59                                     int idx, int min);
60 static inline int qmt_sarr_pool_rem(struct qmt_pool_info *qpi, int idx);
61 static inline int qmt_sarr_pool_free(struct qmt_pool_info *qpi);
62 static inline int qmt_sarr_check_idx(struct qmt_pool_info *qpi, int idx);
63 static inline void qmt_stop_pool_recalc(struct qmt_pool_info *qpi);
64
65 /*
66  * Static helper functions not used outside the scope of this file
67  */
68
69 static inline void qpi_putref_locked(struct qmt_pool_info *pool)
70 {
71         LASSERT(atomic_read(&pool->qpi_ref) > 1);
72         atomic_dec(&pool->qpi_ref);
73 }
74
75 /* some procfs helpers */
76 static int qpi_state_seq_show(struct seq_file *m, void *data)
77 {
78         struct qmt_pool_info    *pool = m->private;
79         int                      type;
80
81         LASSERT(pool != NULL);
82
83         seq_printf(m, "pool:\n"
84                    "    id: %u\n"
85                    "    type: %s\n"
86                    "    ref: %d\n"
87                    "    least qunit: %lu\n",
88                    0,
89                    RES_NAME(pool->qpi_rtype),
90                    atomic_read(&pool->qpi_ref),
91                    pool->qpi_least_qunit);
92
93         for (type = 0; type < LL_MAXQUOTAS; type++)
94                 seq_printf(m, "    %s:\n"
95                            "        #slv: %d\n"
96                            "        #lqe: %d\n",
97                            qtype_name(type),
98                            qpi_slv_nr(pool, type),
99                     atomic_read(&pool->qpi_site[type]->lqs_hash->hs_count));
100
101         return 0;
102 }
103 LPROC_SEQ_FOPS_RO(qpi_state);
104
105 static int qpi_soft_least_qunit_seq_show(struct seq_file *m, void *data)
106 {
107         struct qmt_pool_info    *pool = m->private;
108         LASSERT(pool != NULL);
109
110         seq_printf(m, "%lu\n", pool->qpi_soft_least_qunit);
111         return 0;
112 }
113
114 static ssize_t
115 qpi_soft_least_qunit_seq_write(struct file *file, const char __user *buffer,
116                                size_t count, loff_t *off)
117 {
118         struct seq_file *m = file->private_data;
119         struct qmt_pool_info *pool = m->private;
120         long long least_qunit;
121         int qunit, rc;
122
123         LASSERT(pool != NULL);
124
125         /* Not tuneable for inode limit */
126         if (pool->qpi_rtype != LQUOTA_RES_DT)
127                 return -EINVAL;
128
129         rc = kstrtoll_from_user(buffer, count, 0, &least_qunit);
130         if (rc)
131                 return rc;
132
133         /* Miminal qpi_soft_least_qunit */
134         qunit = pool->qpi_least_qunit << 2;
135         /* The value must be power of miminal qpi_soft_least_qunit, see
136          * how the qunit is adjusted in qmt_adjust_qunit(). */
137         while (qunit > 0 && qunit < least_qunit)
138                 qunit <<= 2;
139         if (qunit <= 0)
140                 qunit = INT_MAX & ~3;
141
142         pool->qpi_soft_least_qunit = qunit;
143         return count;
144 }
145 LPROC_SEQ_FOPS(qpi_soft_least_qunit);
146
147 static struct lprocfs_vars lprocfs_quota_qpi_vars[] = {
148         { .name =       "info",
149           .fops =       &qpi_state_fops },
150         { .name =       "soft_least_qunit",
151           .fops =       &qpi_soft_least_qunit_fops },
152         { NULL }
153 };
154
155 /*
156  * Allocate a new qmt_pool_info structure and add it to qmt_pool_list.
157  *
158  * \param env       - is the environment passed by the caller
159  * \param qmt       - is the quota master target
160  * \param pool_type - is the resource type of this pool instance, either
161  *                    LQUOTA_RES_MD or LQUOTA_RES_DT.
162  *
163  * \retval - 0 on success, appropriate error on failure
164  */
165 static int qmt_pool_alloc(const struct lu_env *env, struct qmt_device *qmt,
166                           char *pool_name, int pool_type)
167 {
168         struct qmt_thread_info  *qti = qmt_info(env);
169         struct qmt_pool_info    *pool;
170         int                      rc = 0;
171         ENTRY;
172
173         OBD_ALLOC_PTR(pool);
174         if (pool == NULL)
175                 RETURN(-ENOMEM);
176         INIT_LIST_HEAD(&pool->qpi_linkage);
177         init_waitqueue_head(&pool->qpi_recalc_thread.t_ctl_waitq);
178         thread_set_flags(&pool->qpi_recalc_thread, SVC_STOPPED);
179         init_rwsem(&pool->qpi_recalc_sem);
180
181         pool->qpi_rtype = pool_type;
182
183         /* initialize refcount to 1, hash table will then grab an additional
184          * reference */
185         atomic_set(&pool->qpi_ref, 1);
186
187         /* set up least qunit size to use for this pool */
188         pool->qpi_least_qunit = LQUOTA_LEAST_QUNIT(pool_type);
189         if (pool_type == LQUOTA_RES_DT)
190                 pool->qpi_soft_least_qunit = pool->qpi_least_qunit << 2;
191         else
192                 pool->qpi_soft_least_qunit = pool->qpi_least_qunit;
193
194         /* grab reference on master target that this pool belongs to */
195         lu_device_get(qmt2lu_dev(qmt));
196         lu_ref_add(&qmt2lu_dev(qmt)->ld_reference, "pool", pool);
197         pool->qpi_qmt = qmt;
198
199         /* create pool proc directory */
200         snprintf(qti->qti_buf, LQUOTA_NAME_MAX, "%s-%s",
201                  RES_NAME(pool_type), pool_name);
202         strncpy(pool->qpi_name, pool_name, QPI_MAXNAME);
203         pool->qpi_proc = lprocfs_register(qti->qti_buf, qmt->qmt_proc,
204                                           lprocfs_quota_qpi_vars, pool);
205         if (IS_ERR(pool->qpi_proc)) {
206                 rc = PTR_ERR(pool->qpi_proc);
207                 CERROR("%s: failed to create proc entry for pool %s (%d)\n",
208                        qmt->qmt_svname, qti->qti_buf, rc);
209                 pool->qpi_proc = NULL;
210                 GOTO(out, rc);
211         }
212
213         rc = qmt_sarr_pool_init(pool);
214         if (rc)
215                 GOTO(out, rc);
216
217         /* add to qmt pool list */
218         down_write(&qmt->qmt_pool_lock);
219         list_add_tail(&pool->qpi_linkage, &qmt->qmt_pool_list);
220         up_write(&qmt->qmt_pool_lock);
221         EXIT;
222 out:
223         if (rc)
224                 /* this frees the pool structure since refcount is equal to 1 */
225                 qpi_putref(env, pool);
226         return rc;
227 }
228
229 /*
230  * Delete a qmt_pool_info instance and all structures associated.
231  *
232  * \param env  - is the environment passed by the caller
233  * \param pool - is the qmt_pool_info structure to free
234  */
235 void qmt_pool_free(const struct lu_env *env, struct qmt_pool_info *pool)
236 {
237         struct  qmt_device *qmt = pool->qpi_qmt;
238         int     qtype;
239         ENTRY;
240
241         /* remove from list */
242         down_write(&qmt->qmt_pool_lock);
243         list_del_init(&pool->qpi_linkage);
244         up_write(&qmt->qmt_pool_lock);
245
246         if (atomic_read(&pool->qpi_ref) > 0)
247                 RETURN_EXIT;
248
249         qmt_stop_pool_recalc(pool);
250         qmt_sarr_pool_free(pool);
251
252         /* release proc entry */
253         if (pool->qpi_proc) {
254                 lprocfs_remove(&pool->qpi_proc);
255                 pool->qpi_proc = NULL;
256         }
257
258         /* release per-quota type site used to manage quota entries as well as
259          * references to global index files */
260         for (qtype = 0; qtype < LL_MAXQUOTAS; qtype++) {
261                 /* release lqe storing grace time */
262                 if (pool->qpi_grace_lqe[qtype] != NULL)
263                         lqe_putref(pool->qpi_grace_lqe[qtype]);
264
265                 /* release site */
266                 if (pool->qpi_site[qtype] != NULL &&
267                     !IS_ERR(pool->qpi_site[qtype]))
268                         lquota_site_free(env, pool->qpi_site[qtype]);
269                 /* release reference to global index */
270                 if (pool->qpi_glb_obj[qtype] != NULL &&
271                     !IS_ERR(pool->qpi_glb_obj[qtype]))
272                         dt_object_put(env, pool->qpi_glb_obj[qtype]);
273         }
274
275         /* release reference on pool directory */
276         if (pool->qpi_root != NULL && !IS_ERR(pool->qpi_root))
277                 dt_object_put(env, pool->qpi_root);
278
279         /* release reference on the master target */
280         if (pool->qpi_qmt != NULL) {
281                 struct lu_device *ld = qmt2lu_dev(pool->qpi_qmt);
282
283                 lu_ref_del(&ld->ld_reference, "pool", pool);
284                 lu_device_put(ld);
285                 pool->qpi_qmt = NULL;
286         }
287
288         LASSERT(list_empty(&pool->qpi_linkage));
289         OBD_FREE_PTR(pool);
290 }
291
292 static inline void qti_pools_init(const struct lu_env *env)
293 {
294         struct qmt_thread_info  *qti = qmt_info(env);
295
296         qti->qti_pools_cnt = 0;
297         qti->qti_pools_num = QMT_MAX_POOL_NUM;
298 }
299
300 #define qti_pools(qti)  (qti->qti_pools_num > QMT_MAX_POOL_NUM ? \
301                                 qti->qti_pools : qti->qti_pools_small)
302 #define qti_pools_env(env) \
303         (qmt_info(env)->qti_pools_num > QMT_MAX_POOL_NUM ? \
304                 qmt_info(env)->qti_pools : qmt_info(env)->qti_pools_small)
305 #define qti_pools_cnt(env)      (qmt_info(env)->qti_pools_cnt)
306
307 static inline int qti_pools_add(const struct lu_env *env,
308                                 struct qmt_pool_info *qpi)
309 {
310         struct qmt_thread_info  *qti = qmt_info(env);
311         struct qmt_pool_info    **pools = qti->qti_pools;
312
313         pools = qti_pools(qti);
314         LASSERTF(qti->qti_pools_num >= QMT_MAX_POOL_NUM,
315                  "Forgot init? %p\n", qti);
316
317         if (qti->qti_pools_cnt > qti->qti_pools_num) {
318                 OBD_ALLOC(pools, sizeof(qpi) * qti->qti_pools_num * 2);
319                 if (!pools)
320                         return -ENOMEM;
321                 memcpy(pools, qti_pools(qti), qti->qti_pools_cnt * sizeof(qpi));
322                 /* Don't need to free, if it is the very 1st allocation */
323                 if (qti->qti_pools_num > QMT_MAX_POOL_NUM)
324                         OBD_FREE(qti->qti_pools,
325                                  qti->qti_pools_num * sizeof(qpi));
326                 qti->qti_pools = pools;
327                 qti->qti_pools_num *= 2;
328         }
329
330         qpi_getref(qpi);
331         /* Take this to protect pool's lqes against changing by
332          * recalculation thread. This would be unlocked at
333          * qti_pools_fini. */
334         down_read(&qpi->qpi_recalc_sem);
335         if (qmt_pool_global(qpi) && qti_pools_cnt(env) > 0) {
336                 pools[qti->qti_pools_cnt++] = pools[0];
337                 /* Store global pool always at index 0 */
338                 pools[0] = qpi;
339         } else {
340                 pools[qti->qti_pools_cnt++] = qpi;
341         }
342
343         CDEBUG(D_QUOTA, "Pool %s is added, pools %p qti_pools %p pool_num %d\n",
344                qpi->qpi_name, pools, qti->qti_pools, qti->qti_pools_cnt);
345
346         return 0;
347 }
348
349 static inline void qti_pools_fini(const struct lu_env *env)
350 {
351         struct qmt_thread_info  *qti = qmt_info(env);
352         struct qmt_pool_info    **pools = qti->qti_pools;
353         int i;
354
355         LASSERT(qti->qti_pools_cnt > 0);
356
357         pools = qti_pools(qti);
358         for (i = 0; i < qti->qti_pools_cnt; i++) {
359                 up_read(&pools[i]->qpi_recalc_sem);
360                 qpi_putref(env, pools[i]);
361         }
362
363         if (qti->qti_pools_num > QMT_MAX_POOL_NUM)
364                 OBD_FREE(qti->qti_pools,
365                          qti->qti_pools_num * sizeof(struct qmt_pool_info *));
366 }
367
368 /*
369  * Look-up a pool in a list based on the type.
370  *
371  * \param env   - is the environment passed by the caller
372  * \param qmt   - is the quota master target
373  * \param rtype - is the type of this pool, either LQUOTA_RES_MD or
374  *                    LQUOTA_RES_DT.
375  * \param pool_name - is the pool name to search for
376  * \param idx   - OST or MDT index to search for. When it is >= 0, function
377  *              returns array with pointers to all pools that include
378  *              targets with requested index.
379  * \param add   - add to qti_pool_arr if true
380  */
381 struct qmt_pool_info *qmt_pool_lookup(const struct lu_env *env,
382                                              struct qmt_device *qmt,
383                                              int rtype,
384                                              char *pool_name,
385                                              int idx, bool add)
386 {
387         struct qmt_pool_info    *pos, *pool;
388         int rc;
389         ENTRY;
390
391         down_read(&qmt->qmt_pool_lock);
392         if (list_empty(&qmt->qmt_pool_list)) {
393                 up_read(&qmt->qmt_pool_lock);
394                 RETURN(ERR_PTR(-ENOENT));
395         }
396
397         CDEBUG(D_QUOTA, "type %d name %p index %d\n",
398                rtype, pool_name, idx);
399         /* Now just find a pool with correct type in a list. Further we need
400          * to go through the list and find a pool that includes requested OST
401          * or MDT. Possibly this would return a list of pools that includes
402          * needed target(OST/MDT). */
403         pool = NULL;
404         if (idx == -1 && !pool_name)
405                 pool_name = GLB_POOL_NAME;
406
407         list_for_each_entry(pos, &qmt->qmt_pool_list, qpi_linkage) {
408                 if (pos->qpi_rtype != rtype)
409                         continue;
410
411                 if (idx >= 0 && !qmt_sarr_check_idx(pos, idx)) {
412                         rc = qti_pools_add(env, pos);
413                         if (rc)
414                                 GOTO(out_err, rc);
415                         continue;
416                 }
417
418                 if (pool_name && !strncmp(pool_name, pos->qpi_name,
419                                           LOV_MAXPOOLNAME)) {
420                         pool = pos;
421                         if (add) {
422                                 rc = qti_pools_add(env, pos);
423                                 if (rc)
424                                         GOTO(out_err, rc);
425                         } else {
426                                 qpi_getref(pool);
427                         }
428                         break;
429                 }
430         }
431         up_read(&qmt->qmt_pool_lock);
432
433         if (idx >= 0 && qti_pools_cnt(env))
434                 pool = qti_pools_env(env)[0];
435
436         RETURN(pool ? : ERR_PTR(-ENOENT));
437 out_err:
438         CERROR("%s: cannot add pool %s: err = %d\n",
439                 qmt->qmt_svname, pos->qpi_name, rc);
440         RETURN(ERR_PTR(rc));
441 }
442
443 /*
444  * Functions implementing the pool API, used by the qmt handlers
445  */
446
447 /*
448  * Destroy all pools which are still in the pool list.
449  *
450  * \param env - is the environment passed by the caller
451  * \param qmt - is the quota master target
452  *
453  */
454 void qmt_pool_fini(const struct lu_env *env, struct qmt_device *qmt)
455 {
456         struct qmt_pool_info *pool, *tmp;
457         ENTRY;
458
459         /* parse list of pool and destroy each element */
460         list_for_each_entry_safe(pool, tmp, &qmt->qmt_pool_list, qpi_linkage) {
461                 /* release extra reference taken in qmt_pool_alloc */
462                 qpi_putref(env, pool);
463         }
464         LASSERT(list_empty(&qmt->qmt_pool_list));
465
466         EXIT;
467 }
468
469 /*
470  * Initialize pool configure for the quota master target. For now, we only
471  * support the default data (i.e. all OSTs) and metadata (i.e. all the MDTs)
472  * pool which are instantiated in this function.
473  *
474  * \param env - is the environment passed by the caller
475  * \param qmt - is the quota master target for which we have to initialize the
476  *              pool configuration
477  *
478  * \retval - 0 on success, appropriate error on failure
479  */
480 int qmt_pool_init(const struct lu_env *env, struct qmt_device *qmt)
481 {
482         int     res, rc = 0;
483         ENTRY;
484
485         INIT_LIST_HEAD(&qmt->qmt_pool_list);
486         init_rwsem(&qmt->qmt_pool_lock);
487
488         /* Instantiate pool master for the default data and metadata pool.
489          * This code will have to be revisited once we support quota on
490          * non-default pools */
491         for (res = LQUOTA_FIRST_RES; res < LQUOTA_LAST_RES; res++) {
492                 rc = qmt_pool_alloc(env, qmt, GLB_POOL_NAME, res);
493                 if (rc)
494                         break;
495         }
496
497         if (rc)
498                 qmt_pool_fini(env, qmt);
499
500         RETURN(rc);
501 }
502
503 static int qmt_slv_cnt(const struct lu_env *env, struct lu_fid *glb_fid,
504                        char *slv_name, struct lu_fid *slv_fid, void *arg)
505 {
506         struct obd_uuid uuid;
507         int (*nr)[QMT_STYPE_CNT][LL_MAXQUOTAS] = arg;
508         int stype, qtype;
509         int rc;
510
511         rc = lquota_extract_fid(glb_fid, NULL, &qtype);
512         LASSERT(!rc);
513
514         obd_str2uuid(&uuid, slv_name);
515         stype = qmt_uuid2idx(&uuid, NULL);
516         if (stype < 0)
517                 return stype;
518         /* one more slave */
519         (*nr)[stype][qtype]++;
520         CDEBUG(D_QUOTA, "slv_name %s stype %d qtype %d nr %d\n",
521                         slv_name, stype, qtype, (*nr)[stype][qtype]);
522
523         return 0;
524 }
525
526 /*
527  * Set up on-disk index files associated with each pool.
528  *
529  * \param env - is the environment passed by the caller
530  * \param qmt - is the quota master target for which we have to initialize the
531  *              pool configuration
532  * \param qmt_root - is the on-disk directory created for the QMT.
533  * \param name - is the pool name that we need to setup. Setup all pools
534  *               in qmt_pool_list when name is NULL.
535  *
536  * \retval - 0 on success, appropriate error on failure
537  */
538 int qmt_pool_prepare(const struct lu_env *env, struct qmt_device *qmt,
539                      struct dt_object *qmt_root, char *name)
540 {
541         struct qmt_thread_info  *qti = qmt_info(env);
542         struct lquota_glb_rec   *rec = &qti->qti_glb_rec;
543         struct qmt_pool_info    *pool;
544         struct dt_device        *dev = NULL;
545         dt_obj_version_t         version;
546         struct list_head        *pos;
547         int                      rc = 0, i, qtype;
548         ENTRY;
549
550         /* iterate over each pool in the list and allocate a quota site for each
551          * one. This involves creating a global index file on disk */
552         list_for_each(pos, &qmt->qmt_pool_list) {
553                 struct dt_object        *obj;
554                 struct lquota_entry     *lqe;
555                 char                    *pool_name;
556                 int                      rtype;
557
558                 pool = list_entry(pos, struct qmt_pool_info,
559                                   qpi_linkage);
560
561                 pool_name = pool->qpi_name;
562                 if (name && strncmp(pool_name, name, LOV_MAXPOOLNAME))
563                         continue;
564                 rtype = pool->qpi_rtype;
565                 if (dev == NULL)
566                         dev = pool->qpi_qmt->qmt_child;
567
568                 /* allocate directory for this pool */
569                 snprintf(qti->qti_buf, LQUOTA_NAME_MAX, "%s-%s",
570                          RES_NAME(rtype), pool_name);
571                 obj = lquota_disk_dir_find_create(env, qmt->qmt_child, qmt_root,
572                                                   qti->qti_buf);
573                 if (IS_ERR(obj))
574                         RETURN(PTR_ERR(obj));
575                 pool->qpi_root = obj;
576
577                 for (qtype = 0; qtype < LL_MAXQUOTAS; qtype++) {
578                         /* Generating FID of global index in charge of storing
579                          * settings for this quota type */
580                         lquota_generate_fid(&qti->qti_fid, rtype, qtype);
581
582                         /* open/create the global index file for this quota
583                          * type. If name is set, it means we came here from
584                          * qmt_pool_new and can create glb index with a
585                          * local generated FID. */
586                         obj = lquota_disk_glb_find_create(env, dev,
587                                                           pool->qpi_root,
588                                                           &qti->qti_fid,
589                                                           name ? true : false);
590                         if (IS_ERR(obj)) {
591                                 rc = PTR_ERR(obj);
592                                 CERROR("%s: failed to create glb index copy for %s type: rc = %d\n",
593                                        qmt->qmt_svname, qtype_name(qtype), rc);
594                                 RETURN(rc);
595                         }
596
597                         pool->qpi_glb_obj[qtype] = obj;
598
599                         version = dt_version_get(env, obj);
600                         /* set default grace time for newly created index */
601                         if (version == 0) {
602                                 rec->qbr_hardlimit = 0;
603                                 rec->qbr_softlimit = 0;
604                                 rec->qbr_granted = 0;
605                                 rec->qbr_time = rtype == LQUOTA_RES_MD ?
606                                         MAX_IQ_TIME : MAX_DQ_TIME;
607
608                                 rc = lquota_disk_write_glb(env, obj, 0, rec);
609                                 if (rc) {
610                                         CERROR("%s: failed to set default grace time for %s type: rc = %d\n",
611                                                qmt->qmt_svname, qtype_name(qtype), rc);
612                                         RETURN(rc);
613                                 }
614
615                                 rc = lquota_disk_update_ver(env, dev, obj, 1);
616                                 if (rc) {
617                                         CERROR("%s: failed to set initial version for %s type: rc = %d\n",
618                                                qmt->qmt_svname, qtype_name(qtype), rc);
619                                         RETURN(rc);
620                                 }
621                         }
622
623                         /* create quota entry site for this quota type */
624                         pool->qpi_site[qtype] = lquota_site_alloc(env, pool,
625                                                                   true, qtype,
626                                                                   &qmt_lqe_ops);
627                         if (IS_ERR(pool->qpi_site[qtype])) {
628                                 rc = PTR_ERR(pool->qpi_site[qtype]);
629                                 CERROR("%s: failed to create site for %s type: rc = %d\n",
630                                        qmt->qmt_svname, qtype_name(qtype), rc);
631                                 RETURN(rc);
632                         }
633
634                         /* count number of slaves which already connected to
635                          * the master in the past */
636                         for (i = 0; i < QMT_STYPE_CNT; i++)
637                                 pool->qpi_slv_nr[i][qtype] = 0;
638
639                         rc = lquota_disk_for_each_slv(env, pool->qpi_root,
640                                                       &qti->qti_fid,
641                                                       qmt_slv_cnt,
642                                                       &pool->qpi_slv_nr);
643                         if (rc) {
644                                 CERROR("%s: failed to scan & count slave indexes for %s type: rc = %d\n",
645                                        qmt->qmt_svname, qtype_name(qtype), rc);
646                                 RETURN(rc);
647                         }
648
649                         /* Global grace time is stored in quota settings of
650                          * ID 0. */
651                         qti->qti_id.qid_uid = 0;
652
653                         /* look-up quota entry storing grace time */
654                         lqe = lqe_locate(env, pool->qpi_site[qtype],
655                                          &qti->qti_id);
656                         if (IS_ERR(lqe))
657                                 RETURN(PTR_ERR(lqe));
658                         pool->qpi_grace_lqe[qtype] = lqe;
659 #ifdef CONFIG_PROC_FS
660                         /* add procfs file to dump the global index, mostly for
661                          * debugging purpose */
662                         snprintf(qti->qti_buf, MTI_NAME_MAXLEN,
663                                  "glb-%s", qtype_name(qtype));
664                         rc = lprocfs_seq_create(pool->qpi_proc, qti->qti_buf,
665                                                 0444, &lprocfs_quota_seq_fops,
666                                                 obj);
667                         if (rc)
668                                 CWARN("%s: Error adding procfs file for global quota index "DFID": rc = %d\n",
669                                       qmt->qmt_svname, PFID(&qti->qti_fid), rc);
670 #endif
671                 }
672                 if (name)
673                         break;
674         }
675
676         RETURN(0);
677 }
678
679 /*
680  * Handle new slave connection. Called when a slave enqueues the global quota
681  * lock at the beginning of the reintegration procedure.
682  *
683  * \param env - is the environment passed by the caller
684  * \parap qmt - is the quota master target handling this request
685  * \param glb_fid - is the fid of the global index file
686  * \param slv_fid - is the fid of the newly created slave index file
687  * \param slv_ver - is the current version of the slave index file
688  * \param uuid    - is the uuid of slave which is (re)connecting to the master
689  *                  target
690  *
691  * \retval - 0 on success, appropriate error on failure
692  */
693 int qmt_pool_new_conn(const struct lu_env *env, struct qmt_device *qmt,
694                       struct lu_fid *glb_fid, struct lu_fid *slv_fid,
695                       __u64 *slv_ver, struct obd_uuid *uuid)
696 {
697         struct qmt_pool_info    *pool;
698         struct dt_object        *slv_obj;
699         int                      pool_type, qtype, stype;
700         bool                     created = false;
701         int                      idx, i, rc = 0;
702
703         stype = qmt_uuid2idx(uuid, &idx);
704         if (stype < 0)
705                 RETURN(stype);
706
707         /* extract pool info from global index FID */
708         rc = lquota_extract_fid(glb_fid, &pool_type, &qtype);
709         if (rc)
710                 RETURN(rc);
711
712         /* look-up pool in charge of this global index FID */
713         qti_pools_init(env);
714         pool = qmt_pool_lookup_arr(env, qmt, pool_type, idx);
715         if (IS_ERR(pool))
716                 RETURN(PTR_ERR(pool));
717
718         /* look-up slave index file */
719         slv_obj = lquota_disk_slv_find(env, qmt->qmt_child, pool->qpi_root,
720                                        glb_fid, uuid);
721         if (IS_ERR(slv_obj) && PTR_ERR(slv_obj) == -ENOENT) {
722                 /* create slave index file */
723                 slv_obj = lquota_disk_slv_find_create(env, qmt->qmt_child,
724                                                       pool->qpi_root, glb_fid,
725                                                       uuid, false);
726                 created = true;
727         }
728         if (IS_ERR(slv_obj)) {
729                 rc = PTR_ERR(slv_obj);
730                 CERROR("%s: failed to create quota slave index file for %s (%d)"
731                        "\n", qmt->qmt_svname, obd_uuid2str(uuid), rc);
732                 GOTO(out, rc);
733         }
734
735         /* retrieve slave fid & current object version */
736         memcpy(slv_fid, lu_object_fid(&slv_obj->do_lu), sizeof(*slv_fid));
737         *slv_ver = dt_version_get(env, slv_obj);
738         dt_object_put(env, slv_obj);
739         if (created)
740                 for (i = 0; i < qti_pools_cnt(env); i++)
741                         qti_pools_env(env)[i]->qpi_slv_nr[stype][qtype]++;
742 out:
743         qti_pools_fini(env);
744         RETURN(rc);
745 }
746
747 /*
748  * Look-up a lquota_entry in the pool hash and allocate it if not found.
749  *
750  * \param env - is the environment passed by the caller
751  * \param qmt - is the quota master target for which we have to initialize the
752  *              pool configuration
753  * \param pool_type - is the pool type, either LQUOTA_RES_MD or LQUOTA_RES_DT.
754  * \param qtype     - is the quota type, either user or group.
755  * \param qid       - is the quota ID to look-up
756  *
757  * \retval - valid pointer to lquota entry on success, appropriate error on
758  *           failure
759  */
760 struct lquota_entry *qmt_pool_lqe_lookup(const struct lu_env *env,
761                                          struct qmt_device *qmt,
762                                          int pool_type, int qtype,
763                                          union lquota_id *qid,
764                                          char *pool_name)
765 {
766         struct qmt_pool_info    *pool;
767         struct lquota_entry     *lqe;
768         ENTRY;
769
770         /* look-up pool responsible for this global index FID */
771         pool = qmt_pool_lookup_name(env, qmt, pool_type, pool_name);
772         if (IS_ERR(pool))
773                 RETURN(ERR_CAST(pool));
774
775         if (qid->qid_uid == 0) {
776                 /* caller wants to access grace time, no need to look up the
777                  * entry since we keep a reference on ID 0 all the time */
778                 lqe = pool->qpi_grace_lqe[qtype];
779                 lqe_getref(lqe);
780                 GOTO(out, lqe);
781         }
782
783         /* now that we have the pool, let's look-up the quota entry in the
784          * right quota site */
785         lqe = lqe_locate(env, pool->qpi_site[qtype], qid);
786 out:
787         qpi_putref(env, pool);
788         RETURN(lqe);
789 }
790
791 int qmt_pool_lqes_lookup(const struct lu_env *env,
792                          struct qmt_device *qmt,
793                          int rtype, int stype,
794                          int qtype, union lquota_id *qid,
795                          char *pool_name, int idx)
796 {
797         struct qmt_pool_info    *pool;
798         struct lquota_entry     *lqe;
799         int rc, i;
800         ENTRY;
801
802         /* Until MDT pools are not emplemented, all MDTs belong to
803          * global pool, thus lookup lqes only from global pool. */
804         if (rtype == LQUOTA_RES_DT && stype == QMT_STYPE_MDT)
805                 idx = -1;
806
807         qti_pools_init(env);
808         rc = 0;
809         /* look-up pool responsible for this global index FID */
810         pool = qmt_pool_lookup_arr(env, qmt, rtype, idx);
811         if (IS_ERR(pool)) {
812                 qti_pools_fini(env);
813                 RETURN(PTR_ERR(pool));
814         }
815
816         /* now that we have the pool, let's look-up the quota entry in the
817          * right quota site */
818         qti_lqes_init(env);
819         for (i = 0; i < qti_pools_cnt(env); i++) {
820                 pool = qti_pools_env(env)[i];
821                 lqe = lqe_locate(env, pool->qpi_site[qtype], qid);
822                 if (IS_ERR(lqe)) {
823                         qti_lqes_fini(env);
824                         GOTO(out, rc = PTR_ERR(lqe));
825                 }
826                 /* Only release could be done for not enforced lqe
827                  * (see qmt_dqacq0). However slave could request to
828                  * release more than not global lqe had granted before
829                  * lqe_enforced was cleared. It is legal case,
830                  * because even if current lqe is not enforced,
831                  * lqes from other pools are still active and avilable
832                  * for acquiring. Furthermore, skip not enforced lqe
833                  * to don't make extra allocations. */
834                 /*if (!lqe_is_glbl(lqe) && !lqe->lqe_enforced) {
835                         lqe_putref(lqe);
836                         continue;
837                 }*/
838                 qti_lqes_add(env, lqe);
839         }
840         LASSERT(qti_lqes_glbl(env)->lqe_is_global);
841
842 out:
843         qti_pools_fini(env);
844         RETURN(rc);
845 }
846
847 static int lqes_cmp(const void *arg1, const void *arg2)
848 {
849         const struct lquota_entry *lqe1, *lqe2;
850         lqe1 = arg1;
851         lqe2 = arg2;
852         return lqe1->lqe_qunit > lqe2->lqe_qunit;
853 }
854
855 void qmt_lqes_sort(const struct lu_env *env)
856 {
857         sort(qti_lqes(env), qti_lqes_cnt(env), sizeof(void *), lqes_cmp, NULL);
858         /* global lqe was moved during sorting */
859         if (!qti_lqes_glbl(env)->lqe_is_global) {
860                 int i;
861                 for (i = 0; i < qti_lqes_cnt(env); i++) {
862                         if (qti_lqes(env)[i]->lqe_is_global) {
863                                 qti_glbl_lqe_idx(env) = i;
864                                 break;
865                         }
866                 }
867         }
868 }
869
870 int qmt_pool_lqes_lookup_spec(const struct lu_env *env, struct qmt_device *qmt,
871                               int rtype, int qtype, union lquota_id *qid)
872 {
873         struct qmt_pool_info    *pos;
874         struct lquota_entry     *lqe;
875         int rc = 0;
876
877         qti_lqes_init(env);
878         down_read(&qmt->qmt_pool_lock);
879         if (list_empty(&qmt->qmt_pool_list)) {
880                 up_read(&qmt->qmt_pool_lock);
881                 RETURN(-ENOENT);
882         }
883
884         list_for_each_entry(pos, &qmt->qmt_pool_list, qpi_linkage) {
885                 if (pos->qpi_rtype != rtype)
886                         continue;
887                 /* Don't take into account pools without slaves */
888                 if (!qpi_slv_nr(pos, qtype))
889                         continue;
890                 lqe = lqe_find(env, pos->qpi_site[qtype], qid);
891                 /* ENOENT is valid case for lqe from non global pool
892                  * that hasn't limits, i.e. not enforced. Continue even
893                  * in case of error - we can handle already found lqes */
894                 if (IS_ERR_OR_NULL(lqe)) {
895                         /* let know that something went wrong */
896                         rc = lqe ? PTR_ERR(lqe) : -ENOENT;
897                         continue;
898                 }
899                 if (!lqe->lqe_enforced) {
900                         /* no settings for this qid_uid */
901                         lqe_putref(lqe);
902                         continue;
903                 }
904                 qti_lqes_add(env, lqe);
905                 CDEBUG(D_QUOTA, "adding lqe %p from pool %s\n",
906                                  lqe, pos->qpi_name);
907         }
908         up_read(&qmt->qmt_pool_lock);
909         RETURN(rc);
910 }
911
912 /**
913  * Allocate a new pool for the specified device.
914  *
915  * Allocate a new pool_desc structure for the specified \a new_pool
916  * device to create a pool with the given \a poolname.  The new pool
917  * structure is created with a single reference, and is freed when the
918  * reference count drops to zero.
919  *
920  * \param[in] obd       Lustre OBD device on which to add a pool iterator
921  * \param[in] poolname  the name of the pool to be created
922  *
923  * \retval              0 in case of success
924  * \retval              negative error code in case of error
925  */
926 int qmt_pool_new(struct obd_device *obd, char *poolname)
927 {
928         struct qmt_device       *qmt = lu2qmt_dev(obd->obd_lu_dev);
929         struct qmt_pool_info *qpi;
930         struct lu_env env;
931         int rc;
932         ENTRY;
933
934         if (strnlen(poolname, LOV_MAXPOOLNAME + 1) > LOV_MAXPOOLNAME)
935                 RETURN(-ENAMETOOLONG);
936
937         rc = lu_env_init(&env, LCT_MD_THREAD);
938         if (rc) {
939                 CERROR("%s: can't init env: rc = %d\n", obd->obd_name, rc);
940                 RETURN(rc);
941         }
942
943         qpi = qmt_pool_lookup_name(&env, qmt, LQUOTA_RES_DT, poolname);
944         if (!IS_ERR(qpi)) {
945                 /* Valid case when several MDTs are mounted
946                  * at the same node. */
947                 CDEBUG(D_QUOTA, "pool %s already exists\n", poolname);
948                 qpi_putref(&env, qpi);
949                 GOTO(out_env, rc = -EEXIST);
950         }
951         if (PTR_ERR(qpi) != -ENOENT) {
952                 CWARN("%s: pool %s lookup failed: rc = %ld\n",
953                       obd->obd_name, poolname, PTR_ERR(qpi));
954                 GOTO(out_env, rc = PTR_ERR(qpi));
955         }
956
957         /* Now allocate and prepare only DATA pool.
958          * Further when MDT pools will be ready we need to add
959          * a cycle here and setup pools of both types. Another
960          * approach is to find out pool of which type should be
961          * created. */
962         rc = qmt_pool_alloc(&env, qmt, poolname, LQUOTA_RES_DT);
963         if (rc) {
964                 CERROR("%s: can't alloc pool %s: rc = %d\n",
965                        obd->obd_name, poolname, rc);
966                 GOTO(out_env, rc);
967         }
968
969         rc = qmt_pool_prepare(&env, qmt, qmt->qmt_root, poolname);
970         if (rc) {
971                 CERROR("%s: can't prepare pool for %s: rc = %d\n",
972                        obd->obd_name, poolname, rc);
973                 GOTO(out_err, rc);
974         }
975
976         CDEBUG(D_QUOTA, "Quota pool "LOV_POOLNAMEF" added\n",
977                poolname);
978
979         GOTO(out_env, rc);
980 out_err:
981         qpi = qmt_pool_lookup_name(&env, qmt, LQUOTA_RES_DT, poolname);
982         if (!IS_ERR(qpi)) {
983                 qpi_putref(&env, qpi);
984                 qpi_putref(&env, qpi);
985         }
986 out_env:
987         lu_env_fini(&env);
988         return rc;
989 }
990
991 static int
992 qmt_obj_recalc(const struct lu_env *env, struct dt_object *obj,
993                struct ptlrpc_thread *thread, struct lquota_site *site)
994 {
995         struct qmt_thread_info *qti = qmt_info(env);
996         union lquota_id *qid = &qti->qti_id;
997         const struct dt_it_ops *iops;
998         struct dt_key *key;
999         struct dt_it *it;
1000         __u64 granted;
1001         int rc;
1002         ENTRY;
1003
1004         iops = &obj->do_index_ops->dio_it;
1005
1006         it = iops->init(env, obj, 0);
1007         if (IS_ERR(it)) {
1008                 CWARN("quota: initialize it for "DFID" failed: rc = %ld\n",
1009                       PFID(&qti->qti_fid), PTR_ERR(it));
1010                 RETURN(PTR_ERR(it));
1011         }
1012
1013         rc = iops->load(env, it, 0);
1014         if (rc < 0) {
1015                 CWARN("quota: load first entry for "DFID" failed: rc = %d\n",
1016                       PFID(&qti->qti_fid), rc);
1017                 GOTO(out, rc);
1018         } else if (rc == 0) {
1019                 rc = iops->next(env, it);
1020                 if (rc != 0)
1021                         GOTO(out, rc = (rc < 0) ? rc : 0);
1022         }
1023
1024         do {
1025                 struct lquota_entry *lqe;
1026
1027                 key = iops->key(env, it);
1028                 if (IS_ERR(key)) {
1029                         CWARN("quota: error key for "DFID": rc = %ld\n",
1030                               PFID(&qti->qti_fid), PTR_ERR(key));
1031                         GOTO(out, rc = PTR_ERR(key));
1032                 }
1033
1034                 /* skip the root user/group */
1035                 if (*((__u64 *)key) == 0)
1036                         goto next;
1037
1038                 qid->qid_uid = *((__u64 *)key);
1039
1040                 rc = qmt_slv_read(env, qid, obj, &granted);
1041                 if (!granted)
1042                         goto next;
1043
1044                 lqe = lqe_locate(env, site, qid);
1045                 if (IS_ERR(lqe))
1046                         GOTO(out, rc = PTR_ERR(lqe));
1047                 lqe_write_lock(lqe);
1048                 lqe->lqe_recalc_granted += granted;
1049                 lqe_write_unlock(lqe);
1050                 lqe_putref(lqe);
1051 next:
1052                 rc = iops->next(env, it);
1053                 if (rc < 0)
1054                         CWARN("quota: failed to parse index "DFID
1055                               ", ->next error: rc = %d\n",
1056                               PFID(&qti->qti_fid), rc);
1057         } while (rc == 0 && thread_is_running(thread));
1058
1059 out:
1060         iops->put(env, it);
1061         iops->fini(env, it);
1062         RETURN(rc);
1063 }
1064
1065 static int qmt_site_recalc_cb(struct cfs_hash *hs, struct cfs_hash_bd *bd,
1066                               struct hlist_node *hnode, void *data)
1067 {
1068         struct lquota_entry     *lqe;
1069         struct lu_env *env = data;
1070
1071         lqe = hlist_entry(hnode, struct lquota_entry, lqe_hash);
1072         LASSERT(atomic_read(&lqe->lqe_ref) > 0);
1073
1074         lqe_write_lock(lqe);
1075         if (lqe->lqe_granted != lqe->lqe_recalc_granted) {
1076                 struct qmt_device *qmt = lqe2qpi(lqe)->qpi_qmt;
1077                 struct thandle *th;
1078                 bool need_notify = false;
1079                 int rc;
1080
1081                 LQUOTA_DEBUG(lqe, "lqe_recalc_granted %llu\n",
1082                              lqe->lqe_recalc_granted);
1083                 lqe->lqe_granted = lqe->lqe_recalc_granted;
1084                 /* Always returns true, if there is no slaves in a pool */
1085                 need_notify |= qmt_adjust_qunit(env, lqe);
1086                 need_notify |= qmt_adjust_edquot(lqe, ktime_get_real_seconds());
1087                 if (need_notify) {
1088                         /* Find all lqes with lqe_id to reseed lgd array */
1089                         rc = qmt_pool_lqes_lookup_spec(env, qmt, lqe_rtype(lqe),
1090                                                 lqe_qtype(lqe), &lqe->lqe_id);
1091                         if (!rc && qti_lqes_glbl(env)->lqe_glbl_data) {
1092                                 qmt_seed_glbe(env,
1093                                         qti_lqes_glbl(env)->lqe_glbl_data);
1094                                 qmt_id_lock_notify(qmt, qti_lqes_glbl(env));
1095                         }
1096                         qti_lqes_fini(env);
1097                 }
1098                 th = dt_trans_create(env, qmt->qmt_child);
1099                 if (IS_ERR(th))
1100                         goto out;
1101
1102                 rc = lquota_disk_declare_write(env, th,
1103                                                LQE_GLB_OBJ(lqe),
1104                                                &lqe->lqe_id);
1105                 if (rc)
1106                         GOTO(out_stop, rc);
1107
1108                 rc = dt_trans_start_local(env, qmt->qmt_child, th);
1109                 if (rc)
1110                         GOTO(out_stop, rc);
1111
1112                 qmt_glb_write(env, th, lqe, 0, NULL);
1113 out_stop:
1114                 dt_trans_stop(env, qmt->qmt_child, th);
1115         }
1116 out:
1117         lqe->lqe_recalc_granted = 0;
1118         lqe_write_unlock(lqe);
1119
1120         return 0;
1121 }
1122
1123 #define MDT_DEV_NAME_LEN (LUSTRE_MAXFSNAME + sizeof("-MDT0000"))
1124 static struct obd_device *qmt_get_mgc(struct qmt_device *qmt)
1125 {
1126         char mdt_name[MDT_DEV_NAME_LEN];
1127         struct lustre_mount_info *lmi;
1128         struct obd_device *obd;
1129         int rc;
1130         ENTRY;
1131
1132         rc = server_name2fsname(qmt->qmt_svname, mdt_name, NULL);
1133         if (rc) {
1134                 CERROR("quota: cannot get server name from %s: rc = %d\n",
1135                        qmt->qmt_svname, rc);
1136                 RETURN(ERR_PTR(rc));
1137         }
1138
1139         strlcat(mdt_name, "-MDT0000", MDT_DEV_NAME_LEN);
1140         lmi = server_get_mount(mdt_name);
1141         if (lmi == NULL) {
1142                 rc = -ENOENT;
1143                 CERROR("%s: cannot get mount info from %s: rc = %d\n",
1144                        qmt->qmt_svname, mdt_name, rc);
1145                 RETURN(ERR_PTR(rc));
1146         }
1147         obd = s2lsi(lmi->lmi_sb)->lsi_mgc;
1148         lustre_put_lsi(lmi->lmi_sb);
1149
1150         RETURN(obd);
1151 }
1152
1153 static int qmt_pool_recalc(void *args)
1154 {
1155         struct qmt_pool_info *pool, *glbl_pool;
1156         struct rw_semaphore *sem = NULL;
1157         struct obd_device *obd;
1158         struct lu_env env;
1159         int i, rc, qtype, slaves_cnt;
1160         ENTRY;
1161
1162         pool = args;
1163         thread_set_flags(&pool->qpi_recalc_thread, SVC_RUNNING);
1164
1165         obd = qmt_get_mgc(pool->qpi_qmt);
1166         if (IS_ERR(obd))
1167                 GOTO(out, rc = PTR_ERR(obd));
1168         else
1169                 /* Waiting for the end of processing mgs config.
1170                  * It is needed to be sure all pools are configured. */
1171                 while (obd->obd_process_conf)
1172                         schedule_timeout_uninterruptible(cfs_time_seconds(1));
1173
1174         sem = qmt_sarr_rwsem(pool);
1175         LASSERT(sem);
1176         down_read(sem);
1177         /* Hold this to be sure that OSTs from this pool
1178          * can't do acquire/release.
1179          *
1180          * I guess below write semaphore could be a bottleneck
1181          * as qmt_dqacq would be blocked trying to hold
1182          * read_lock at qmt_pool_lookup->qti_pools_add.
1183          * But on the other hand adding/removing OSTs to the pool is
1184          * a rare operation. If finally this would be a problem,
1185          * we can consider another approach. For example we can
1186          * iterate through the POOL's lqes. Take lqe, hold lqe_write_lock
1187          * and go through appropriate OSTs. I don't use this approach now
1188          * as newly created pool hasn't lqes entries. So firstly we need
1189          * to get this lqes from the global pool index file. This
1190          * solution looks more complex, so leave it as it is. */
1191         down_write(&pool->qpi_recalc_sem);
1192
1193         rc = lu_env_init(&env, LCT_MD_THREAD);
1194         if (rc) {
1195                 CERROR("%s: cannot init env: rc = %d\n", obd->obd_name, rc);
1196                 GOTO(out, rc);
1197         }
1198
1199         glbl_pool = qmt_pool_lookup_glb(&env, pool->qpi_qmt, pool->qpi_rtype);
1200         if (IS_ERR(glbl_pool))
1201                 GOTO(out_env, rc = PTR_ERR(glbl_pool));
1202
1203         slaves_cnt = qmt_sarr_count(pool);
1204         CDEBUG(D_QUOTA, "Starting pool recalculation for %d slaves in %s\n",
1205                slaves_cnt, pool->qpi_name);
1206
1207         for (qtype = 0; qtype < LL_MAXQUOTAS; qtype++) {
1208                 for (i = 0; i < slaves_cnt; i++) {
1209                         struct qmt_thread_info  *qti = qmt_info(&env);
1210                         struct dt_object *slv_obj;
1211                         struct obd_uuid uuid;
1212                         int idx;
1213
1214                         if (thread_is_stopping(&pool->qpi_recalc_thread))
1215                                 GOTO(out_stop, rc = 0);
1216                         idx = qmt_sarr_get_idx(pool, i);
1217                         LASSERT(idx >= 0);
1218
1219                         /* We don't need fsname here - anyway
1220                          * lquota_disk_slv_filename ignores it. */
1221                         snprintf(uuid.uuid, UUID_MAX, "-OST%04x_UUID", idx);
1222                         lquota_generate_fid(&qti->qti_fid, pool->qpi_rtype,
1223                                             qtype);
1224                         /* look-up index file associated with acquiring slave */
1225                         slv_obj = lquota_disk_slv_find(&env,
1226                                                 glbl_pool->qpi_qmt->qmt_child,
1227                                                 glbl_pool->qpi_root,
1228                                                 &qti->qti_fid,
1229                                                 &uuid);
1230                         if (IS_ERR(slv_obj))
1231                                 GOTO(out_stop, rc = PTR_ERR(slv_obj));
1232
1233                         CDEBUG(D_QUOTA, "slv_obj is found %p for uuid %s\n",
1234                                slv_obj, uuid.uuid);
1235                         qmt_obj_recalc(&env, slv_obj,
1236                                        &pool->qpi_recalc_thread,
1237                                        pool->qpi_site[qtype]);
1238                         dt_object_put(&env, slv_obj);
1239                 }
1240                 /* Now go trough the site hash and compare lqe_granted
1241                  * with lqe_calc_granted. Write new value if disagree */
1242
1243                 cfs_hash_for_each(pool->qpi_site[qtype]->lqs_hash,
1244                                   qmt_site_recalc_cb, &env);
1245         }
1246         GOTO(out_stop, rc);
1247 out_stop:
1248         qpi_putref(&env, glbl_pool);
1249 out_env:
1250         lu_env_fini(&env);
1251 out:
1252         thread_set_flags(&pool->qpi_recalc_thread, SVC_STOPPED);
1253         wake_up(&pool->qpi_recalc_thread.t_ctl_waitq);
1254         clear_bit(QPI_FLAG_RECALC_OFFSET, &pool->qpi_flags);
1255         /* Pool can't be changed, since sem has been down.
1256          * Thus until up_read, no one can restart recalc thread. */
1257         if (sem) {
1258                 up_read(sem);
1259                 up_write(&pool->qpi_recalc_sem);
1260         }
1261         qpi_putref(&env, pool);
1262
1263         return rc;
1264 }
1265
1266 static int qmt_start_pool_recalc(struct lu_env *env, struct qmt_pool_info *qpi)
1267 {
1268         struct task_struct *task;
1269         char *name;
1270         int rc = 0;
1271
1272         if (!test_and_set_bit(QPI_FLAG_RECALC_OFFSET, &qpi->qpi_flags)) {
1273                 LASSERT(thread_is_stopped(&qpi->qpi_recalc_thread) ||
1274                         thread_is_init(&qpi->qpi_recalc_thread));
1275                 OBD_ALLOC(name, QPI_MAXNAME + sizeof("qmt_pool_recalc_"));
1276                 if (name == NULL)
1277                         RETURN(-ENOMEM);
1278
1279                 snprintf(name, QPI_MAXNAME, "qsd_reint_%s",
1280                          qpi->qpi_name);
1281
1282                 qpi_getref(qpi);
1283                 thread_set_flags(&qpi->qpi_recalc_thread, SVC_STARTING);
1284                 task = kthread_run(qmt_pool_recalc, qpi, name);
1285                 if (IS_ERR(task)) {
1286                         thread_set_flags(&qpi->qpi_recalc_thread, SVC_STOPPED);
1287                         clear_bit(QPI_FLAG_RECALC_OFFSET, &qpi->qpi_flags);
1288                         rc = PTR_ERR(task);
1289                         qpi_putref(env, qpi);
1290                 }
1291                 OBD_FREE(name, QPI_MAXNAME + sizeof("qmt_pool_recalc_"));
1292         }
1293
1294         RETURN(rc);
1295 }
1296
1297 static inline void qmt_stop_pool_recalc(struct qmt_pool_info *qpi)
1298 {
1299         struct ptlrpc_thread    *thread = &qpi->qpi_recalc_thread;
1300
1301         if (!thread_is_stopped(thread)) {
1302                 thread_set_flags(thread, SVC_STOPPING);
1303                 wake_up(&thread->t_ctl_waitq);
1304
1305                 wait_event_idle(thread->t_ctl_waitq,
1306                                 thread_is_stopped(thread));
1307         }
1308 }
1309
1310 static int qmt_pool_slv_nr_change(const struct lu_env *env,
1311                                   struct qmt_pool_info *pool,
1312                                   int idx, bool add)
1313 {
1314         struct qmt_pool_info *glbl_pool;
1315         int qtype;
1316
1317         glbl_pool = qmt_pool_lookup_glb(env, pool->qpi_qmt, LQUOTA_RES_DT);
1318         if (IS_ERR(glbl_pool))
1319                 RETURN(PTR_ERR(glbl_pool));
1320
1321         for (qtype = 0; qtype < LL_MAXQUOTAS; qtype++) {
1322                 struct qmt_thread_info  *qti = qmt_info(env);
1323                 struct dt_object *slv_obj;
1324                 struct obd_uuid uuid;
1325
1326                 /* We don't need fsname here - anyway
1327                  * lquota_disk_slv_filename ignores it. */
1328                 snprintf(uuid.uuid, UUID_MAX, "-OST%04x_UUID", idx);
1329                 lquota_generate_fid(&qti->qti_fid, pool->qpi_rtype,
1330                                     qtype);
1331                 /* look-up index file associated with acquiring slave */
1332                 slv_obj = lquota_disk_slv_find(env,
1333                                         glbl_pool->qpi_qmt->qmt_child,
1334                                         glbl_pool->qpi_root,
1335                                         &qti->qti_fid,
1336                                         &uuid);
1337                 if (IS_ERR(slv_obj))
1338                         continue;
1339
1340                 if (add)
1341                         pool->qpi_slv_nr[QMT_STYPE_OST][qtype]++;
1342                 else
1343                         pool->qpi_slv_nr[QMT_STYPE_OST][qtype]--;
1344                 dt_object_put(env, slv_obj);
1345         }
1346         qpi_putref(env, glbl_pool);
1347
1348         return 0;
1349 }
1350
1351 static int qmt_pool_add_rem(struct obd_device *obd, char *poolname,
1352                             char *slavename, bool add)
1353 {
1354         struct qmt_device       *qmt = lu2qmt_dev(obd->obd_lu_dev);
1355         struct qmt_pool_info    *qpi;
1356         struct lu_env            env;
1357         int                      rc, idx;
1358         ENTRY;
1359
1360         if (strnlen(poolname, LOV_MAXPOOLNAME + 1) > LOV_MAXPOOLNAME)
1361                 RETURN(-ENAMETOOLONG);
1362
1363         CDEBUG(D_QUOTA, add ? "%s: pool %s, adding %s\n" :
1364                               "%s: pool %s, removing %s\n",
1365               obd->obd_name, poolname, slavename);
1366
1367         rc = server_name2index(slavename, &idx, NULL);
1368         if (rc != LDD_F_SV_TYPE_OST)
1369                 RETURN(-EINVAL);
1370
1371         rc = lu_env_init(&env, LCT_MD_THREAD);
1372         if (rc) {
1373                 CERROR("%s: cannot init env: rc = %d\n", obd->obd_name, rc);
1374                 RETURN(rc);
1375         }
1376
1377         qpi = qmt_pool_lookup_name(&env, qmt, LQUOTA_RES_DT, poolname);
1378         if (IS_ERR(qpi)) {
1379                 CWARN("%s: can't find pool %s: rc = %long\n",
1380                       obd->obd_name, poolname, PTR_ERR(qpi));
1381                 GOTO(out, rc = PTR_ERR(qpi));
1382         }
1383
1384         rc = add ? qmt_sarr_pool_add(qpi, idx, 32) :
1385                    qmt_sarr_pool_rem(qpi, idx);
1386         if (rc) {
1387                 CERROR("%s: can't %s %s pool %s: rc = %d\n",
1388                        add ? "add to" : "remove", obd->obd_name,
1389                        slavename, poolname, rc);
1390                 GOTO(out_putref, rc);
1391         }
1392         qmt_pool_slv_nr_change(&env, qpi, idx, add);
1393         qmt_start_pool_recalc(&env, qpi);
1394
1395 out_putref:
1396         qpi_putref(&env, qpi);
1397 out:
1398         lu_env_fini(&env);
1399         RETURN(rc);
1400 }
1401
1402
1403
1404 /**
1405  * Add a single target device to the named pool.
1406  *
1407  * \param[in] obd       OBD device on which to add the pool
1408  * \param[in] poolname  name of the pool to which to add the target \a slavename
1409  * \param[in] slavename name of the target device to be added
1410  *
1411  * \retval              0 if \a slavename was (previously) added to the pool
1412  * \retval              negative error number on failure
1413  */
1414 int qmt_pool_add(struct obd_device *obd, char *poolname, char *slavename)
1415 {
1416         return qmt_pool_add_rem(obd, poolname, slavename, true);
1417 }
1418
1419 /**
1420  * Remove the named target from the specified pool.
1421  *
1422  * \param[in] obd       OBD device from which to remove \a poolname
1423  * \param[in] poolname  name of the pool to be changed
1424  * \param[in] slavename name of the target to remove from \a poolname
1425  *
1426  * \retval              0 on successfully removing \a slavename from the pool
1427  * \retval              negative number on error (e.g. \a slavename not in pool)
1428  */
1429 int qmt_pool_rem(struct obd_device *obd, char *poolname, char *slavename)
1430 {
1431         return qmt_pool_add_rem(obd, poolname, slavename, false);
1432 }
1433
1434 /**
1435  * Remove the named pool from the QMT device.
1436  *
1437  * \param[in] obd       OBD device on which pool was previously created
1438  * \param[in] poolname  name of pool to remove from \a obd
1439  *
1440  * \retval              0 on successfully removing the pool
1441  * \retval              negative error numbers for failures
1442  */
1443 int qmt_pool_del(struct obd_device *obd, char *poolname)
1444 {
1445         struct qmt_device       *qmt = lu2qmt_dev(obd->obd_lu_dev);
1446         struct qmt_pool_info    *qpi;
1447         struct lu_fid            fid;
1448         char                     buf[LQUOTA_NAME_MAX];
1449         struct lu_env            env;
1450         int                      rc;
1451         int                      qtype;
1452         ENTRY;
1453
1454         if (strnlen(poolname, LOV_MAXPOOLNAME + 1) > LOV_MAXPOOLNAME)
1455                 RETURN(-ENAMETOOLONG);
1456
1457         CDEBUG(D_QUOTA, "Removing quota pool "LOV_POOLNAMEF"\n",
1458                poolname);
1459
1460         rc = lu_env_init(&env, LCT_MD_THREAD);
1461         if (rc) {
1462                 CERROR("%s: cannot init env: rc = %d\n", obd->obd_name, rc);
1463                 RETURN(rc);
1464         }
1465
1466         /* look-up pool in charge of this global index FID */
1467         qpi = qmt_pool_lookup_name(&env, qmt, LQUOTA_RES_DT, poolname);
1468         if (IS_ERR(qpi)) {
1469                 /* Valid case for several MDTs at the same node -
1470                  * pool removed by the 1st MDT in config */
1471                 CDEBUG(D_QUOTA, "Cannot find pool %s\n", poolname);
1472                 lu_env_fini(&env);
1473                 RETURN(PTR_ERR(qpi));
1474         }
1475
1476         for (qtype = 0; qtype < LL_MAXQUOTAS; qtype++) {
1477                 lquota_generate_fid(&fid, LQUOTA_RES_DT, qtype);
1478                 snprintf(buf, LQUOTA_NAME_MAX, "0x%x", fid.f_oid);
1479                 rc = local_object_unlink(&env, qmt->qmt_child,
1480                                          qpi->qpi_root, buf);
1481                 if (rc)
1482                         CWARN("%s: cannot unlink %s from pool %s: rc = %d\n",
1483                               obd->obd_name, buf, poolname, rc);
1484         }
1485
1486         /* put ref from look-up */
1487         qpi_putref(&env, qpi);
1488         /* put last ref to free qpi */
1489         qpi_putref(&env, qpi);
1490
1491         snprintf(buf, LQUOTA_NAME_MAX, "%s-%s",
1492                  RES_NAME(LQUOTA_RES_DT), poolname);
1493         rc = local_object_unlink(&env, qmt->qmt_child, qmt->qmt_root, buf);
1494         if (rc)
1495                 CWARN("%s: cannot unlink dir %s: rc = %d\n",
1496                       obd->obd_name, poolname, rc);
1497
1498         lu_env_fini(&env);
1499         RETURN(0);
1500 }
1501
1502 static inline int qmt_sarr_pool_init(struct qmt_pool_info *qpi)
1503 {
1504
1505         /* No need to initialize sarray for global pool
1506          * as it always includes all slaves */
1507         if (qmt_pool_global(qpi))
1508                 return 0;
1509
1510         switch (qpi->qpi_rtype) {
1511         case LQUOTA_RES_DT:
1512                 return tgt_pool_init(&qpi->qpi_sarr.osts, 0);
1513         case LQUOTA_RES_MD:
1514         default:
1515                 return 0;
1516         }
1517 }
1518
1519 static inline int qmt_sarr_pool_add(struct qmt_pool_info *qpi, int idx, int min)
1520 {
1521         switch (qpi->qpi_rtype) {
1522         case LQUOTA_RES_DT:
1523                 return tgt_pool_add(&qpi->qpi_sarr.osts, idx, min);
1524         case LQUOTA_RES_MD:
1525         default:
1526                 return 0;
1527         }
1528 }
1529
1530 static inline int qmt_sarr_pool_rem(struct qmt_pool_info *qpi, int idx)
1531 {
1532         switch (qpi->qpi_rtype) {
1533         case LQUOTA_RES_DT:
1534                 return tgt_pool_remove(&qpi->qpi_sarr.osts, idx);
1535         case LQUOTA_RES_MD:
1536         default:
1537                 return 0;
1538         }
1539 }
1540
1541 static inline int qmt_sarr_pool_free(struct qmt_pool_info *qpi)
1542 {
1543         if (qmt_pool_global(qpi))
1544                 return 0;
1545
1546         switch (qpi->qpi_rtype) {
1547         case LQUOTA_RES_DT:
1548                 if (!qpi->qpi_sarr.osts.op_array)
1549                         return 0;
1550                 return tgt_pool_free(&qpi->qpi_sarr.osts);
1551         case LQUOTA_RES_MD:
1552         default:
1553                 return 0;
1554         }
1555 }
1556
1557 static inline int qmt_sarr_check_idx(struct qmt_pool_info *qpi, int idx)
1558 {
1559         if (qmt_pool_global(qpi))
1560                 return 0;
1561
1562         switch (qpi->qpi_rtype) {
1563         case LQUOTA_RES_DT:
1564                 return tgt_check_index(idx, &qpi->qpi_sarr.osts);
1565         case LQUOTA_RES_MD:
1566         default:
1567                 return 0;
1568         }
1569 }
1570
1571 inline struct rw_semaphore *qmt_sarr_rwsem(struct qmt_pool_info *qpi)
1572 {
1573         switch (qpi->qpi_rtype) {
1574         case LQUOTA_RES_DT:
1575                 /* to protect ost_pool use */
1576                 return &qpi->qpi_sarr.osts.op_rw_sem;
1577         case LQUOTA_RES_MD:
1578         default:
1579                 return NULL;
1580         }
1581 }
1582
1583 inline int qmt_sarr_get_idx(struct qmt_pool_info *qpi, int arr_idx)
1584 {
1585
1586         if (qmt_pool_global(qpi))
1587                 return arr_idx;
1588
1589         switch (qpi->qpi_rtype) {
1590         case LQUOTA_RES_DT:
1591                 LASSERTF(arr_idx < qpi->qpi_sarr.osts.op_count && arr_idx >= 0,
1592                          "idx invalid %d op_count %d\n", arr_idx,
1593                          qpi->qpi_sarr.osts.op_count);
1594                 return qpi->qpi_sarr.osts.op_array[arr_idx];
1595         case LQUOTA_RES_MD:
1596         default:
1597                 return -EINVAL;
1598         }
1599 }
1600
1601 /* Number of slaves in a pool */
1602 inline unsigned int qmt_sarr_count(struct qmt_pool_info *qpi)
1603 {
1604         switch (qpi->qpi_rtype) {
1605         case LQUOTA_RES_DT:
1606                 return qpi->qpi_sarr.osts.op_count;
1607         case LQUOTA_RES_MD:
1608         default:
1609                 return -EINVAL;
1610         }
1611 }