Whamcloud - gitweb
8d4fee7974887e055b3058a049043026cf637370
[fs/lustre-release.git] / lustre / quota / qmt_pool.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 021110-1307, USA
20  *
21  * GPL HEADER END
22  */
23 /*
24  * Copyright (c) 2012, 2017, Intel Corporation.
25  * Use is subject to license terms.
26  *
27  * Author: Johann Lombardi <johann.lombardi@intel.com>
28  * Author: Niu    Yawei    <yawei.niu@intel.com>
29  */
30
31 /*
32  * A Quota Master Target has a list(qmt_pool_list) where it stores qmt_pool_info
33  * structures. There is one such structure for each pool managed by the QMT.
34  *
35  * Each pool can have different quota types enforced (typically user & group
36  * quota). A pool is in charge of managing lquota_entry structures for each
37  * quota type. This is done by creating one lquota_entry site per quota
38  * type. A site stores entries in a hash table and read quota settings from disk
39  * when a given ID isn't present in the hash.
40  *
41  * The pool API exported here is the following:
42  * - qmt_pool_init(): initializes the general QMT structures used to manage
43  *                    pools.
44  * - qmt_pool_fini(): frees the structures allocated by qmt_pool_fini().
45  * - qmt_pool_prepare(): sets up the on-disk indexes associated with each pool.
46  * - qmt_pool_new_conn(): is used to create a new slave index file.
47  * - qmt_pool_lqe_lookup(): returns an up-to-date lquota entry associated with
48  *                          a given ID.
49  */
50
51 #define DEBUG_SUBSYSTEM S_LQUOTA
52
53 #include <obd_class.h>
54 #include <lprocfs_status.h>
55 #include "qmt_internal.h"
56
57 static inline int qmt_sarr_pool_init(struct qmt_pool_info *qpi);
58 static inline int qmt_sarr_pool_add(struct qmt_pool_info *qpi,
59                                     int idx, int min);
60 static inline int qmt_sarr_pool_rem(struct qmt_pool_info *qpi, int idx);
61 static inline int qmt_sarr_pool_free(struct qmt_pool_info *qpi);
62 static inline int qmt_sarr_check_idx(struct qmt_pool_info *qpi, int idx);
63 static inline void qmt_stop_pool_recalc(struct qmt_pool_info *qpi);
64
65 /*
66  * Static helper functions not used outside the scope of this file
67  */
68
69 static inline void qpi_putref_locked(struct qmt_pool_info *pool)
70 {
71         LASSERT(atomic_read(&pool->qpi_ref) > 1);
72         atomic_dec(&pool->qpi_ref);
73 }
74
75 /* some procfs helpers */
76 static int qpi_state_seq_show(struct seq_file *m, void *data)
77 {
78         struct qmt_pool_info    *pool = m->private;
79         int                      type;
80
81         LASSERT(pool != NULL);
82
83         seq_printf(m, "pool:\n"
84                    "    id: %u\n"
85                    "    type: %s\n"
86                    "    ref: %d\n"
87                    "    least qunit: %lu\n",
88                    0,
89                    RES_NAME(pool->qpi_rtype),
90                    atomic_read(&pool->qpi_ref),
91                    pool->qpi_least_qunit);
92
93         for (type = 0; type < LL_MAXQUOTAS; type++)
94                 seq_printf(m, "    %s:\n"
95                            "        #slv: %d\n"
96                            "        #lqe: %d\n",
97                            qtype_name(type),
98                            qpi_slv_nr(pool, type),
99                     atomic_read(&pool->qpi_site[type]->lqs_hash->hs_count));
100
101         return 0;
102 }
103 LPROC_SEQ_FOPS_RO(qpi_state);
104
105 static int qpi_soft_least_qunit_seq_show(struct seq_file *m, void *data)
106 {
107         struct qmt_pool_info    *pool = m->private;
108         LASSERT(pool != NULL);
109
110         seq_printf(m, "%lu\n", pool->qpi_soft_least_qunit);
111         return 0;
112 }
113
114 static ssize_t
115 qpi_soft_least_qunit_seq_write(struct file *file, const char __user *buffer,
116                                size_t count, loff_t *off)
117 {
118         struct seq_file *m = file->private_data;
119         struct qmt_pool_info *pool = m->private;
120         long long least_qunit;
121         int qunit, rc;
122
123         LASSERT(pool != NULL);
124
125         /* Not tuneable for inode limit */
126         if (pool->qpi_rtype != LQUOTA_RES_DT)
127                 return -EINVAL;
128
129         rc = kstrtoll_from_user(buffer, count, 0, &least_qunit);
130         if (rc)
131                 return rc;
132
133         /* Miminal qpi_soft_least_qunit */
134         qunit = pool->qpi_least_qunit << 2;
135         /* The value must be power of miminal qpi_soft_least_qunit, see
136          * how the qunit is adjusted in qmt_adjust_qunit(). */
137         while (qunit > 0 && qunit < least_qunit)
138                 qunit <<= 2;
139         if (qunit <= 0)
140                 qunit = INT_MAX & ~3;
141
142         pool->qpi_soft_least_qunit = qunit;
143         return count;
144 }
145 LPROC_SEQ_FOPS(qpi_soft_least_qunit);
146
147 static struct lprocfs_vars lprocfs_quota_qpi_vars[] = {
148         { .name =       "info",
149           .fops =       &qpi_state_fops },
150         { .name =       "soft_least_qunit",
151           .fops =       &qpi_soft_least_qunit_fops },
152         { NULL }
153 };
154
155 /*
156  * Allocate a new qmt_pool_info structure and add it to qmt_pool_list.
157  *
158  * \param env       - is the environment passed by the caller
159  * \param qmt       - is the quota master target
160  * \param pool_type - is the resource type of this pool instance, either
161  *                    LQUOTA_RES_MD or LQUOTA_RES_DT.
162  *
163  * \retval - 0 on success, appropriate error on failure
164  */
165 static int qmt_pool_alloc(const struct lu_env *env, struct qmt_device *qmt,
166                           char *pool_name, int pool_type)
167 {
168         struct qmt_thread_info  *qti = qmt_info(env);
169         struct qmt_pool_info    *pool;
170         int                      rc = 0;
171         ENTRY;
172
173         OBD_ALLOC_PTR(pool);
174         if (pool == NULL)
175                 RETURN(-ENOMEM);
176         INIT_LIST_HEAD(&pool->qpi_linkage);
177         init_rwsem(&pool->qpi_recalc_sem);
178
179         pool->qpi_rtype = pool_type;
180
181         /* initialize refcount to 1, hash table will then grab an additional
182          * reference */
183         atomic_set(&pool->qpi_ref, 1);
184
185         /* set up least qunit size to use for this pool */
186         pool->qpi_least_qunit = LQUOTA_LEAST_QUNIT(pool_type);
187         if (pool_type == LQUOTA_RES_DT)
188                 pool->qpi_soft_least_qunit = pool->qpi_least_qunit << 2;
189         else
190                 pool->qpi_soft_least_qunit = pool->qpi_least_qunit;
191
192         /* grab reference on master target that this pool belongs to */
193         lu_device_get(qmt2lu_dev(qmt));
194         lu_ref_add(&qmt2lu_dev(qmt)->ld_reference, "pool", pool);
195         pool->qpi_qmt = qmt;
196
197         /* create pool proc directory */
198         snprintf(qti->qti_buf, LQUOTA_NAME_MAX, "%s-%s",
199                  RES_NAME(pool_type), pool_name);
200         strncpy(pool->qpi_name, pool_name, QPI_MAXNAME);
201         pool->qpi_proc = lprocfs_register(qti->qti_buf, qmt->qmt_proc,
202                                           lprocfs_quota_qpi_vars, pool);
203         if (IS_ERR(pool->qpi_proc)) {
204                 rc = PTR_ERR(pool->qpi_proc);
205                 CERROR("%s: failed to create proc entry for pool %s (%d)\n",
206                        qmt->qmt_svname, qti->qti_buf, rc);
207                 pool->qpi_proc = NULL;
208                 GOTO(out, rc);
209         }
210
211         rc = qmt_sarr_pool_init(pool);
212         if (rc)
213                 GOTO(out, rc);
214
215         /* add to qmt pool list */
216         down_write(&qmt->qmt_pool_lock);
217         list_add_tail(&pool->qpi_linkage, &qmt->qmt_pool_list);
218         up_write(&qmt->qmt_pool_lock);
219         EXIT;
220 out:
221         if (rc)
222                 /* this frees the pool structure since refcount is equal to 1 */
223                 qpi_putref(env, pool);
224         return rc;
225 }
226
227 /*
228  * Delete a qmt_pool_info instance and all structures associated.
229  *
230  * \param env  - is the environment passed by the caller
231  * \param pool - is the qmt_pool_info structure to free
232  */
233 void qmt_pool_free(const struct lu_env *env, struct qmt_pool_info *pool)
234 {
235         struct  qmt_device *qmt = pool->qpi_qmt;
236         int     qtype;
237         ENTRY;
238
239         /* remove from list */
240         down_write(&qmt->qmt_pool_lock);
241         list_del_init(&pool->qpi_linkage);
242         up_write(&qmt->qmt_pool_lock);
243
244         if (atomic_read(&pool->qpi_ref) > 0)
245                 RETURN_EXIT;
246
247         qmt_stop_pool_recalc(pool);
248         qmt_sarr_pool_free(pool);
249
250         /* release proc entry */
251         if (pool->qpi_proc) {
252                 lprocfs_remove(&pool->qpi_proc);
253                 pool->qpi_proc = NULL;
254         }
255
256         /* release per-quota type site used to manage quota entries as well as
257          * references to global index files */
258         for (qtype = 0; qtype < LL_MAXQUOTAS; qtype++) {
259                 /* release lqe storing grace time */
260                 if (pool->qpi_grace_lqe[qtype] != NULL)
261                         lqe_putref(pool->qpi_grace_lqe[qtype]);
262
263                 /* release site */
264                 if (pool->qpi_site[qtype] != NULL &&
265                     !IS_ERR(pool->qpi_site[qtype]))
266                         lquota_site_free(env, pool->qpi_site[qtype]);
267                 /* release reference to global index */
268                 if (pool->qpi_glb_obj[qtype] != NULL &&
269                     !IS_ERR(pool->qpi_glb_obj[qtype]))
270                         dt_object_put(env, pool->qpi_glb_obj[qtype]);
271         }
272
273         /* release reference on pool directory */
274         if (pool->qpi_root != NULL && !IS_ERR(pool->qpi_root))
275                 dt_object_put(env, pool->qpi_root);
276
277         /* release reference on the master target */
278         if (pool->qpi_qmt != NULL) {
279                 struct lu_device *ld = qmt2lu_dev(pool->qpi_qmt);
280
281                 lu_ref_del(&ld->ld_reference, "pool", pool);
282                 lu_device_put(ld);
283                 pool->qpi_qmt = NULL;
284         }
285
286         LASSERT(list_empty(&pool->qpi_linkage));
287         OBD_FREE_PTR(pool);
288 }
289
290 static inline void qti_pools_init(const struct lu_env *env)
291 {
292         struct qmt_thread_info  *qti = qmt_info(env);
293
294         qti->qti_pools_cnt = 0;
295         qti->qti_pools_num = QMT_MAX_POOL_NUM;
296 }
297
298 #define qti_pools(qti)  (qti->qti_pools_num > QMT_MAX_POOL_NUM ? \
299                                 qti->qti_pools : qti->qti_pools_small)
300 #define qti_pools_env(env) \
301         (qmt_info(env)->qti_pools_num > QMT_MAX_POOL_NUM ? \
302                 qmt_info(env)->qti_pools : qmt_info(env)->qti_pools_small)
303 #define qti_pools_cnt(env)      (qmt_info(env)->qti_pools_cnt)
304
305 static inline int qti_pools_add(const struct lu_env *env,
306                                 struct qmt_pool_info *qpi)
307 {
308         struct qmt_thread_info  *qti = qmt_info(env);
309         struct qmt_pool_info    **pools = qti->qti_pools;
310
311         pools = qti_pools(qti);
312         LASSERTF(qti->qti_pools_num >= QMT_MAX_POOL_NUM,
313                  "Forgot init? %p\n", qti);
314
315         if (qti->qti_pools_cnt > qti->qti_pools_num) {
316                 OBD_ALLOC(pools, sizeof(qpi) * qti->qti_pools_num * 2);
317                 if (!pools)
318                         return -ENOMEM;
319                 memcpy(pools, qti_pools(qti), qti->qti_pools_cnt * sizeof(qpi));
320                 /* Don't need to free, if it is the very 1st allocation */
321                 if (qti->qti_pools_num > QMT_MAX_POOL_NUM)
322                         OBD_FREE(qti->qti_pools,
323                                  qti->qti_pools_num * sizeof(qpi));
324                 qti->qti_pools = pools;
325                 qti->qti_pools_num *= 2;
326         }
327
328         qpi_getref(qpi);
329         /* Take this to protect pool's lqes against changing by
330          * recalculation thread. This would be unlocked at
331          * qti_pools_fini. */
332         down_read(&qpi->qpi_recalc_sem);
333         if (qmt_pool_global(qpi) && qti_pools_cnt(env) > 0) {
334                 pools[qti->qti_pools_cnt++] = pools[0];
335                 /* Store global pool always at index 0 */
336                 pools[0] = qpi;
337         } else {
338                 pools[qti->qti_pools_cnt++] = qpi;
339         }
340
341         CDEBUG(D_QUOTA, "Pool %s is added, pools %p qti_pools %p pool_num %d\n",
342                qpi->qpi_name, pools, qti->qti_pools, qti->qti_pools_cnt);
343
344         return 0;
345 }
346
347 static inline void qti_pools_fini(const struct lu_env *env)
348 {
349         struct qmt_thread_info  *qti = qmt_info(env);
350         struct qmt_pool_info    **pools = qti->qti_pools;
351         int i;
352
353         LASSERT(qti->qti_pools_cnt > 0);
354
355         pools = qti_pools(qti);
356         for (i = 0; i < qti->qti_pools_cnt; i++) {
357                 up_read(&pools[i]->qpi_recalc_sem);
358                 qpi_putref(env, pools[i]);
359         }
360
361         if (qti->qti_pools_num > QMT_MAX_POOL_NUM)
362                 OBD_FREE(qti->qti_pools,
363                          qti->qti_pools_num * sizeof(struct qmt_pool_info *));
364 }
365
366 /*
367  * Look-up a pool in a list based on the type.
368  *
369  * \param env   - is the environment passed by the caller
370  * \param qmt   - is the quota master target
371  * \param rtype - is the type of this pool, either LQUOTA_RES_MD or
372  *                    LQUOTA_RES_DT.
373  * \param pool_name - is the pool name to search for
374  * \param idx   - OST or MDT index to search for. When it is >= 0, function
375  *              returns array with pointers to all pools that include
376  *              targets with requested index.
377  * \param add   - add to qti_pool_arr if true
378  */
379 struct qmt_pool_info *qmt_pool_lookup(const struct lu_env *env,
380                                              struct qmt_device *qmt,
381                                              int rtype,
382                                              char *pool_name,
383                                              int idx, bool add)
384 {
385         struct qmt_pool_info    *pos, *pool;
386         int rc;
387         ENTRY;
388
389         down_read(&qmt->qmt_pool_lock);
390         if (list_empty(&qmt->qmt_pool_list)) {
391                 up_read(&qmt->qmt_pool_lock);
392                 RETURN(ERR_PTR(-ENOENT));
393         }
394
395         CDEBUG(D_QUOTA, "type %d name %s index %d\n",
396                rtype, pool_name ?: "<none>", idx);
397         /* Now just find a pool with correct type in a list. Further we need
398          * to go through the list and find a pool that includes requested OST
399          * or MDT. Possibly this would return a list of pools that includes
400          * needed target(OST/MDT). */
401         pool = NULL;
402         if (idx == -1 && !pool_name)
403                 pool_name = GLB_POOL_NAME;
404
405         list_for_each_entry(pos, &qmt->qmt_pool_list, qpi_linkage) {
406                 if (pos->qpi_rtype != rtype)
407                         continue;
408
409                 if (idx >= 0 && !qmt_sarr_check_idx(pos, idx)) {
410                         rc = qti_pools_add(env, pos);
411                         if (rc)
412                                 GOTO(out_err, rc);
413                         continue;
414                 }
415
416                 if (pool_name && !strncmp(pool_name, pos->qpi_name,
417                                           LOV_MAXPOOLNAME)) {
418                         pool = pos;
419                         if (add) {
420                                 rc = qti_pools_add(env, pos);
421                                 if (rc)
422                                         GOTO(out_err, rc);
423                         } else {
424                                 qpi_getref(pool);
425                         }
426                         break;
427                 }
428         }
429         up_read(&qmt->qmt_pool_lock);
430
431         if (idx >= 0 && qti_pools_cnt(env))
432                 pool = qti_pools_env(env)[0];
433
434         RETURN(pool ? : ERR_PTR(-ENOENT));
435 out_err:
436         CERROR("%s: cannot add pool %s: err = %d\n",
437                 qmt->qmt_svname, pos->qpi_name, rc);
438         RETURN(ERR_PTR(rc));
439 }
440
441 /*
442  * Functions implementing the pool API, used by the qmt handlers
443  */
444
445 /*
446  * Destroy all pools which are still in the pool list.
447  *
448  * \param env - is the environment passed by the caller
449  * \param qmt - is the quota master target
450  *
451  */
452 void qmt_pool_fini(const struct lu_env *env, struct qmt_device *qmt)
453 {
454         struct qmt_pool_info *pool, *tmp;
455         ENTRY;
456
457         /* parse list of pool and destroy each element */
458         list_for_each_entry_safe(pool, tmp, &qmt->qmt_pool_list, qpi_linkage) {
459                 /* release extra reference taken in qmt_pool_alloc */
460                 qpi_putref(env, pool);
461         }
462         LASSERT(list_empty(&qmt->qmt_pool_list));
463
464         EXIT;
465 }
466
467 /*
468  * Initialize pool configure for the quota master target. For now, we only
469  * support the default data (i.e. all OSTs) and metadata (i.e. all the MDTs)
470  * pool which are instantiated in this function.
471  *
472  * \param env - is the environment passed by the caller
473  * \param qmt - is the quota master target for which we have to initialize the
474  *              pool configuration
475  *
476  * \retval - 0 on success, appropriate error on failure
477  */
478 int qmt_pool_init(const struct lu_env *env, struct qmt_device *qmt)
479 {
480         int     res, rc = 0;
481         ENTRY;
482
483         INIT_LIST_HEAD(&qmt->qmt_pool_list);
484         init_rwsem(&qmt->qmt_pool_lock);
485
486         /* Instantiate pool master for the default data and metadata pool.
487          * This code will have to be revisited once we support quota on
488          * non-default pools */
489         for (res = LQUOTA_FIRST_RES; res < LQUOTA_LAST_RES; res++) {
490                 rc = qmt_pool_alloc(env, qmt, GLB_POOL_NAME, res);
491                 if (rc)
492                         break;
493         }
494
495         if (rc)
496                 qmt_pool_fini(env, qmt);
497
498         RETURN(rc);
499 }
500
501 static int qmt_slv_cnt(const struct lu_env *env, struct lu_fid *glb_fid,
502                        char *slv_name, struct lu_fid *slv_fid, void *arg)
503 {
504         struct obd_uuid uuid;
505         int (*nr)[QMT_STYPE_CNT][LL_MAXQUOTAS] = arg;
506         int stype, qtype;
507         int rc;
508
509         rc = lquota_extract_fid(glb_fid, NULL, &qtype);
510         LASSERT(!rc);
511
512         obd_str2uuid(&uuid, slv_name);
513         stype = qmt_uuid2idx(&uuid, NULL);
514         if (stype < 0)
515                 return stype;
516         /* one more slave */
517         (*nr)[stype][qtype]++;
518         CDEBUG(D_QUOTA, "slv_name %s stype %d qtype %d nr %d\n",
519                         slv_name, stype, qtype, (*nr)[stype][qtype]);
520
521         return 0;
522 }
523
524 /*
525  * Set up on-disk index files associated with each pool.
526  *
527  * \param env - is the environment passed by the caller
528  * \param qmt - is the quota master target for which we have to initialize the
529  *              pool configuration
530  * \param qmt_root - is the on-disk directory created for the QMT.
531  * \param name - is the pool name that we need to setup. Setup all pools
532  *               in qmt_pool_list when name is NULL.
533  *
534  * \retval - 0 on success, appropriate error on failure
535  */
536 int qmt_pool_prepare(const struct lu_env *env, struct qmt_device *qmt,
537                      struct dt_object *qmt_root, char *name)
538 {
539         struct qmt_thread_info  *qti = qmt_info(env);
540         struct lquota_glb_rec   *rec = &qti->qti_glb_rec;
541         struct qmt_pool_info    *pool;
542         struct dt_device        *dev = NULL;
543         dt_obj_version_t         version;
544         struct list_head        *pos;
545         int                      rc = 0, i, qtype;
546         ENTRY;
547
548         /* iterate over each pool in the list and allocate a quota site for each
549          * one. This involves creating a global index file on disk */
550         list_for_each(pos, &qmt->qmt_pool_list) {
551                 struct dt_object        *obj;
552                 struct lquota_entry     *lqe;
553                 char                    *pool_name;
554                 int                      rtype;
555
556                 pool = list_entry(pos, struct qmt_pool_info,
557                                   qpi_linkage);
558
559                 pool_name = pool->qpi_name;
560                 if (name && strncmp(pool_name, name, LOV_MAXPOOLNAME))
561                         continue;
562                 rtype = pool->qpi_rtype;
563                 if (dev == NULL)
564                         dev = pool->qpi_qmt->qmt_child;
565
566                 /* allocate directory for this pool */
567                 snprintf(qti->qti_buf, LQUOTA_NAME_MAX, "%s-%s",
568                          RES_NAME(rtype), pool_name);
569                 obj = lquota_disk_dir_find_create(env, qmt->qmt_child, qmt_root,
570                                                   qti->qti_buf);
571                 if (IS_ERR(obj))
572                         RETURN(PTR_ERR(obj));
573                 pool->qpi_root = obj;
574
575                 for (qtype = 0; qtype < LL_MAXQUOTAS; qtype++) {
576                         /* Generating FID of global index in charge of storing
577                          * settings for this quota type */
578                         lquota_generate_fid(&qti->qti_fid, rtype, qtype);
579
580                         /* open/create the global index file for this quota
581                          * type. If name is set, it means we came here from
582                          * qmt_pool_new and can create glb index with a
583                          * local generated FID. */
584                         obj = lquota_disk_glb_find_create(env, dev,
585                                                           pool->qpi_root,
586                                                           &qti->qti_fid,
587                                                           name ? true : false);
588                         if (IS_ERR(obj)) {
589                                 rc = PTR_ERR(obj);
590                                 CERROR("%s: failed to create glb index copy for %s type: rc = %d\n",
591                                        qmt->qmt_svname, qtype_name(qtype), rc);
592                                 RETURN(rc);
593                         }
594
595                         pool->qpi_glb_obj[qtype] = obj;
596
597                         version = dt_version_get(env, obj);
598                         /* set default grace time for newly created index */
599                         if (version == 0) {
600                                 rec->qbr_hardlimit = 0;
601                                 rec->qbr_softlimit = 0;
602                                 rec->qbr_granted = 0;
603                                 rec->qbr_time = rtype == LQUOTA_RES_MD ?
604                                         MAX_IQ_TIME : MAX_DQ_TIME;
605
606                                 rc = lquota_disk_write_glb(env, obj, 0, rec);
607                                 if (rc) {
608                                         CERROR("%s: failed to set default grace time for %s type: rc = %d\n",
609                                                qmt->qmt_svname, qtype_name(qtype), rc);
610                                         RETURN(rc);
611                                 }
612
613                                 rc = lquota_disk_update_ver(env, dev, obj, 1);
614                                 if (rc) {
615                                         CERROR("%s: failed to set initial version for %s type: rc = %d\n",
616                                                qmt->qmt_svname, qtype_name(qtype), rc);
617                                         RETURN(rc);
618                                 }
619                         }
620
621                         /* create quota entry site for this quota type */
622                         pool->qpi_site[qtype] = lquota_site_alloc(env, pool,
623                                                                   true, qtype,
624                                                                   &qmt_lqe_ops);
625                         if (IS_ERR(pool->qpi_site[qtype])) {
626                                 rc = PTR_ERR(pool->qpi_site[qtype]);
627                                 CERROR("%s: failed to create site for %s type: rc = %d\n",
628                                        qmt->qmt_svname, qtype_name(qtype), rc);
629                                 RETURN(rc);
630                         }
631
632                         /* count number of slaves which already connected to
633                          * the master in the past */
634                         for (i = 0; i < QMT_STYPE_CNT; i++)
635                                 pool->qpi_slv_nr[i][qtype] = 0;
636
637                         rc = lquota_disk_for_each_slv(env, pool->qpi_root,
638                                                       &qti->qti_fid,
639                                                       qmt_slv_cnt,
640                                                       &pool->qpi_slv_nr);
641                         if (rc) {
642                                 CERROR("%s: failed to scan & count slave indexes for %s type: rc = %d\n",
643                                        qmt->qmt_svname, qtype_name(qtype), rc);
644                                 RETURN(rc);
645                         }
646
647                         /* Global grace time is stored in quota settings of
648                          * ID 0. */
649                         qti->qti_id.qid_uid = 0;
650
651                         /* look-up quota entry storing grace time */
652                         lqe = lqe_locate(env, pool->qpi_site[qtype],
653                                          &qti->qti_id);
654                         if (IS_ERR(lqe))
655                                 RETURN(PTR_ERR(lqe));
656                         pool->qpi_grace_lqe[qtype] = lqe;
657 #ifdef CONFIG_PROC_FS
658                         /* add procfs file to dump the global index, mostly for
659                          * debugging purpose */
660                         snprintf(qti->qti_buf, MTI_NAME_MAXLEN,
661                                  "glb-%s", qtype_name(qtype));
662                         rc = lprocfs_seq_create(pool->qpi_proc, qti->qti_buf,
663                                                 0444, &lprocfs_quota_seq_fops,
664                                                 obj);
665                         if (rc)
666                                 CWARN("%s: Error adding procfs file for global quota index "DFID": rc = %d\n",
667                                       qmt->qmt_svname, PFID(&qti->qti_fid), rc);
668 #endif
669                 }
670                 if (name)
671                         break;
672         }
673
674         RETURN(0);
675 }
676
677 /*
678  * Handle new slave connection. Called when a slave enqueues the global quota
679  * lock at the beginning of the reintegration procedure.
680  *
681  * \param env - is the environment passed by the caller
682  * \parap qmt - is the quota master target handling this request
683  * \param glb_fid - is the fid of the global index file
684  * \param slv_fid - is the fid of the newly created slave index file
685  * \param slv_ver - is the current version of the slave index file
686  * \param uuid    - is the uuid of slave which is (re)connecting to the master
687  *                  target
688  *
689  * \retval - 0 on success, appropriate error on failure
690  */
691 int qmt_pool_new_conn(const struct lu_env *env, struct qmt_device *qmt,
692                       struct lu_fid *glb_fid, struct lu_fid *slv_fid,
693                       __u64 *slv_ver, struct obd_uuid *uuid)
694 {
695         struct qmt_pool_info    *pool;
696         struct dt_object        *slv_obj;
697         int                      pool_type, qtype, stype;
698         bool                     created = false;
699         int                      idx, i, rc = 0;
700
701         stype = qmt_uuid2idx(uuid, &idx);
702         if (stype < 0)
703                 RETURN(stype);
704
705         /* extract pool info from global index FID */
706         rc = lquota_extract_fid(glb_fid, &pool_type, &qtype);
707         if (rc)
708                 RETURN(rc);
709
710         /* look-up pool in charge of this global index FID */
711         qti_pools_init(env);
712         pool = qmt_pool_lookup_arr(env, qmt, pool_type, idx);
713         if (IS_ERR(pool))
714                 RETURN(PTR_ERR(pool));
715
716         /* look-up slave index file */
717         slv_obj = lquota_disk_slv_find(env, qmt->qmt_child, pool->qpi_root,
718                                        glb_fid, uuid);
719         if (IS_ERR(slv_obj) && PTR_ERR(slv_obj) == -ENOENT) {
720                 /* create slave index file */
721                 slv_obj = lquota_disk_slv_find_create(env, qmt->qmt_child,
722                                                       pool->qpi_root, glb_fid,
723                                                       uuid, false);
724                 created = true;
725         }
726         if (IS_ERR(slv_obj)) {
727                 rc = PTR_ERR(slv_obj);
728                 CERROR("%s: failed to create quota slave index file for %s (%d)"
729                        "\n", qmt->qmt_svname, obd_uuid2str(uuid), rc);
730                 GOTO(out, rc);
731         }
732
733         /* retrieve slave fid & current object version */
734         memcpy(slv_fid, lu_object_fid(&slv_obj->do_lu), sizeof(*slv_fid));
735         *slv_ver = dt_version_get(env, slv_obj);
736         dt_object_put(env, slv_obj);
737         if (created)
738                 for (i = 0; i < qti_pools_cnt(env); i++)
739                         qti_pools_env(env)[i]->qpi_slv_nr[stype][qtype]++;
740 out:
741         qti_pools_fini(env);
742         RETURN(rc);
743 }
744
745 /*
746  * Look-up a lquota_entry in the pool hash and allocate it if not found.
747  *
748  * \param env - is the environment passed by the caller
749  * \param qmt - is the quota master target for which we have to initialize the
750  *              pool configuration
751  * \param pool_type - is the pool type, either LQUOTA_RES_MD or LQUOTA_RES_DT.
752  * \param qtype     - is the quota type, either user or group.
753  * \param qid       - is the quota ID to look-up
754  *
755  * \retval - valid pointer to lquota entry on success, appropriate error on
756  *           failure
757  */
758 struct lquota_entry *qmt_pool_lqe_lookup(const struct lu_env *env,
759                                          struct qmt_device *qmt,
760                                          int pool_type, int qtype,
761                                          union lquota_id *qid,
762                                          char *pool_name)
763 {
764         struct qmt_pool_info    *pool;
765         struct lquota_entry     *lqe;
766         ENTRY;
767
768         /* look-up pool responsible for this global index FID */
769         pool = qmt_pool_lookup_name(env, qmt, pool_type, pool_name);
770         if (IS_ERR(pool))
771                 RETURN(ERR_CAST(pool));
772
773         if (qid->qid_uid == 0) {
774                 /* caller wants to access grace time, no need to look up the
775                  * entry since we keep a reference on ID 0 all the time */
776                 lqe = pool->qpi_grace_lqe[qtype];
777                 lqe_getref(lqe);
778                 GOTO(out, lqe);
779         }
780
781         /* now that we have the pool, let's look-up the quota entry in the
782          * right quota site */
783         lqe = lqe_locate(env, pool->qpi_site[qtype], qid);
784 out:
785         qpi_putref(env, pool);
786         RETURN(lqe);
787 }
788
789 int qmt_pool_lqes_lookup(const struct lu_env *env,
790                          struct qmt_device *qmt,
791                          int rtype, int stype,
792                          int qtype, union lquota_id *qid,
793                          char *pool_name, int idx)
794 {
795         struct qmt_pool_info    *pool;
796         struct lquota_entry     *lqe;
797         int rc, i;
798         ENTRY;
799
800         /* Until MDT pools are not emplemented, all MDTs belong to
801          * global pool, thus lookup lqes only from global pool. */
802         if (rtype == LQUOTA_RES_DT && stype == QMT_STYPE_MDT)
803                 idx = -1;
804
805         qti_pools_init(env);
806         rc = 0;
807         /* look-up pool responsible for this global index FID */
808         pool = qmt_pool_lookup_arr(env, qmt, rtype, idx);
809         if (IS_ERR(pool)) {
810                 qti_pools_fini(env);
811                 RETURN(PTR_ERR(pool));
812         }
813
814         /* now that we have the pool, let's look-up the quota entry in the
815          * right quota site */
816         qti_lqes_init(env);
817         for (i = 0; i < qti_pools_cnt(env); i++) {
818                 pool = qti_pools_env(env)[i];
819                 lqe = lqe_locate(env, pool->qpi_site[qtype], qid);
820                 if (IS_ERR(lqe)) {
821                         qti_lqes_fini(env);
822                         GOTO(out, rc = PTR_ERR(lqe));
823                 }
824                 /* Only release could be done for not enforced lqe
825                  * (see qmt_dqacq0). However slave could request to
826                  * release more than not global lqe had granted before
827                  * lqe_enforced was cleared. It is legal case,
828                  * because even if current lqe is not enforced,
829                  * lqes from other pools are still active and avilable
830                  * for acquiring. Furthermore, skip not enforced lqe
831                  * to don't make extra allocations. */
832                 /*if (!lqe_is_glbl(lqe) && !lqe->lqe_enforced) {
833                         lqe_putref(lqe);
834                         continue;
835                 }*/
836                 qti_lqes_add(env, lqe);
837         }
838         LASSERT(qti_lqes_glbl(env)->lqe_is_global);
839
840 out:
841         qti_pools_fini(env);
842         RETURN(rc);
843 }
844
845 static int lqes_cmp(const void *arg1, const void *arg2)
846 {
847         const struct lquota_entry *lqe1, *lqe2;
848
849         lqe1 = *(const struct lquota_entry **)arg1;
850         lqe2 = *(const struct lquota_entry **)arg2;
851         if (lqe1->lqe_qunit > lqe2->lqe_qunit)
852                 return 1;
853         if (lqe1->lqe_qunit < lqe2->lqe_qunit)
854                 return -1;
855         return 0;
856 }
857
858 void qmt_lqes_sort(const struct lu_env *env)
859 {
860         sort(qti_lqes(env), qti_lqes_cnt(env), sizeof(void *), lqes_cmp, NULL);
861         /* global lqe was moved during sorting */
862         if (!qti_lqes_glbl(env)->lqe_is_global) {
863                 int i;
864                 for (i = 0; i < qti_lqes_cnt(env); i++) {
865                         if (qti_lqes(env)[i]->lqe_is_global) {
866                                 qti_glbl_lqe_idx(env) = i;
867                                 break;
868                         }
869                 }
870         }
871 }
872
873 int qmt_pool_lqes_lookup_spec(const struct lu_env *env, struct qmt_device *qmt,
874                               int rtype, int qtype, union lquota_id *qid)
875 {
876         struct qmt_pool_info    *pos;
877         struct lquota_entry     *lqe;
878         int rc = 0;
879
880         qti_lqes_init(env);
881         down_read(&qmt->qmt_pool_lock);
882         if (list_empty(&qmt->qmt_pool_list)) {
883                 up_read(&qmt->qmt_pool_lock);
884                 RETURN(-ENOENT);
885         }
886
887         list_for_each_entry(pos, &qmt->qmt_pool_list, qpi_linkage) {
888                 if (pos->qpi_rtype != rtype)
889                         continue;
890                 /* Don't take into account pools without slaves */
891                 if (!qpi_slv_nr(pos, qtype))
892                         continue;
893                 lqe = lqe_find(env, pos->qpi_site[qtype], qid);
894                 /* ENOENT is valid case for lqe from non global pool
895                  * that hasn't limits, i.e. not enforced. Continue even
896                  * in case of error - we can handle already found lqes */
897                 if (IS_ERR_OR_NULL(lqe)) {
898                         /* let know that something went wrong */
899                         rc = lqe ? PTR_ERR(lqe) : -ENOENT;
900                         continue;
901                 }
902                 if (!lqe->lqe_enforced) {
903                         /* no settings for this qid_uid */
904                         lqe_putref(lqe);
905                         continue;
906                 }
907                 qti_lqes_add(env, lqe);
908                 CDEBUG(D_QUOTA, "adding lqe %p from pool %s\n",
909                                  lqe, pos->qpi_name);
910         }
911         up_read(&qmt->qmt_pool_lock);
912         RETURN(rc);
913 }
914
915 /**
916  * Allocate a new pool for the specified device.
917  *
918  * Allocate a new pool_desc structure for the specified \a new_pool
919  * device to create a pool with the given \a poolname.  The new pool
920  * structure is created with a single reference, and is freed when the
921  * reference count drops to zero.
922  *
923  * \param[in] obd       Lustre OBD device on which to add a pool iterator
924  * \param[in] poolname  the name of the pool to be created
925  *
926  * \retval              0 in case of success
927  * \retval              negative error code in case of error
928  */
929 int qmt_pool_new(struct obd_device *obd, char *poolname)
930 {
931         struct qmt_device       *qmt = lu2qmt_dev(obd->obd_lu_dev);
932         struct qmt_pool_info *qpi;
933         struct lu_env env;
934         int rc;
935         ENTRY;
936
937         if (strnlen(poolname, LOV_MAXPOOLNAME + 1) > LOV_MAXPOOLNAME)
938                 RETURN(-ENAMETOOLONG);
939
940         rc = lu_env_init(&env, LCT_MD_THREAD);
941         if (rc) {
942                 CERROR("%s: can't init env: rc = %d\n", obd->obd_name, rc);
943                 RETURN(rc);
944         }
945
946         qpi = qmt_pool_lookup_name(&env, qmt, LQUOTA_RES_DT, poolname);
947         if (!IS_ERR(qpi)) {
948                 /* Valid case when several MDTs are mounted
949                  * at the same node. */
950                 CDEBUG(D_QUOTA, "pool %s already exists\n", poolname);
951                 qpi_putref(&env, qpi);
952                 GOTO(out_env, rc = -EEXIST);
953         }
954         if (PTR_ERR(qpi) != -ENOENT) {
955                 CWARN("%s: pool %s lookup failed: rc = %ld\n",
956                       obd->obd_name, poolname, PTR_ERR(qpi));
957                 GOTO(out_env, rc = PTR_ERR(qpi));
958         }
959
960         /* Now allocate and prepare only DATA pool.
961          * Further when MDT pools will be ready we need to add
962          * a cycle here and setup pools of both types. Another
963          * approach is to find out pool of which type should be
964          * created. */
965         rc = qmt_pool_alloc(&env, qmt, poolname, LQUOTA_RES_DT);
966         if (rc) {
967                 CERROR("%s: can't alloc pool %s: rc = %d\n",
968                        obd->obd_name, poolname, rc);
969                 GOTO(out_env, rc);
970         }
971
972         rc = qmt_pool_prepare(&env, qmt, qmt->qmt_root, poolname);
973         if (rc) {
974                 CERROR("%s: can't prepare pool for %s: rc = %d\n",
975                        obd->obd_name, poolname, rc);
976                 GOTO(out_err, rc);
977         }
978
979         CDEBUG(D_QUOTA, "Quota pool "LOV_POOLNAMEF" added\n",
980                poolname);
981
982         GOTO(out_env, rc);
983 out_err:
984         qpi = qmt_pool_lookup_name(&env, qmt, LQUOTA_RES_DT, poolname);
985         if (!IS_ERR(qpi)) {
986                 qpi_putref(&env, qpi);
987                 qpi_putref(&env, qpi);
988         }
989 out_env:
990         lu_env_fini(&env);
991         return rc;
992 }
993
994 static int
995 qmt_obj_recalc(const struct lu_env *env, struct dt_object *obj,
996                struct lquota_site *site)
997 {
998         struct qmt_thread_info *qti = qmt_info(env);
999         union lquota_id *qid = &qti->qti_id;
1000         const struct dt_it_ops *iops;
1001         struct dt_key *key;
1002         struct dt_it *it;
1003         __u64 granted;
1004         int rc;
1005         ENTRY;
1006
1007         iops = &obj->do_index_ops->dio_it;
1008
1009         it = iops->init(env, obj, 0);
1010         if (IS_ERR(it)) {
1011                 CWARN("quota: initialize it for "DFID" failed: rc = %ld\n",
1012                       PFID(&qti->qti_fid), PTR_ERR(it));
1013                 RETURN(PTR_ERR(it));
1014         }
1015
1016         rc = iops->load(env, it, 0);
1017         if (rc < 0) {
1018                 CWARN("quota: load first entry for "DFID" failed: rc = %d\n",
1019                       PFID(&qti->qti_fid), rc);
1020                 GOTO(out, rc);
1021         } else if (rc == 0) {
1022                 rc = iops->next(env, it);
1023                 if (rc != 0)
1024                         GOTO(out, rc = (rc < 0) ? rc : 0);
1025         }
1026
1027         do {
1028                 struct lquota_entry *lqe;
1029
1030                 key = iops->key(env, it);
1031                 if (IS_ERR(key)) {
1032                         CWARN("quota: error key for "DFID": rc = %ld\n",
1033                               PFID(&qti->qti_fid), PTR_ERR(key));
1034                         GOTO(out, rc = PTR_ERR(key));
1035                 }
1036
1037                 /* skip the root user/group */
1038                 if (*((__u64 *)key) == 0)
1039                         goto next;
1040
1041                 qid->qid_uid = *((__u64 *)key);
1042
1043                 rc = qmt_slv_read(env, qid, obj, &granted);
1044                 if (!granted)
1045                         goto next;
1046
1047                 lqe = lqe_locate(env, site, qid);
1048                 if (IS_ERR(lqe))
1049                         GOTO(out, rc = PTR_ERR(lqe));
1050                 lqe_write_lock(lqe);
1051                 lqe->lqe_recalc_granted += granted;
1052                 lqe_write_unlock(lqe);
1053                 lqe_putref(lqe);
1054 next:
1055                 rc = iops->next(env, it);
1056                 if (rc < 0)
1057                         CWARN("quota: failed to parse index "DFID
1058                               ", ->next error: rc = %d\n",
1059                               PFID(&qti->qti_fid), rc);
1060         } while (rc == 0 && !kthread_should_stop());
1061
1062 out:
1063         iops->put(env, it);
1064         iops->fini(env, it);
1065         RETURN(rc);
1066 }
1067
1068 static int qmt_site_recalc_cb(struct cfs_hash *hs, struct cfs_hash_bd *bd,
1069                               struct hlist_node *hnode, void *data)
1070 {
1071         struct lquota_entry     *lqe;
1072         struct lu_env *env = data;
1073
1074         lqe = hlist_entry(hnode, struct lquota_entry, lqe_hash);
1075         LASSERT(atomic_read(&lqe->lqe_ref) > 0);
1076
1077         lqe_write_lock(lqe);
1078         if (lqe->lqe_granted != lqe->lqe_recalc_granted) {
1079                 struct qmt_device *qmt = lqe2qpi(lqe)->qpi_qmt;
1080                 struct thandle *th;
1081                 bool need_notify = false;
1082                 int rc;
1083
1084                 LQUOTA_DEBUG(lqe, "lqe_recalc_granted %llu\n",
1085                              lqe->lqe_recalc_granted);
1086                 lqe->lqe_granted = lqe->lqe_recalc_granted;
1087                 /* Always returns true, if there is no slaves in a pool */
1088                 need_notify |= qmt_adjust_qunit(env, lqe);
1089                 need_notify |= qmt_adjust_edquot(lqe, ktime_get_real_seconds());
1090                 if (need_notify) {
1091                         /* Find all lqes with lqe_id to reseed lgd array */
1092                         rc = qmt_pool_lqes_lookup_spec(env, qmt, lqe_rtype(lqe),
1093                                                 lqe_qtype(lqe), &lqe->lqe_id);
1094                         if (!rc && qti_lqes_glbl(env)->lqe_glbl_data) {
1095                                 qmt_seed_glbe(env,
1096                                         qti_lqes_glbl(env)->lqe_glbl_data);
1097                                 qmt_id_lock_notify(qmt, qti_lqes_glbl(env));
1098                         }
1099                         qti_lqes_fini(env);
1100                 }
1101                 th = dt_trans_create(env, qmt->qmt_child);
1102                 if (IS_ERR(th))
1103                         goto out;
1104
1105                 rc = lquota_disk_declare_write(env, th,
1106                                                LQE_GLB_OBJ(lqe),
1107                                                &lqe->lqe_id);
1108                 if (rc)
1109                         GOTO(out_stop, rc);
1110
1111                 rc = dt_trans_start_local(env, qmt->qmt_child, th);
1112                 if (rc)
1113                         GOTO(out_stop, rc);
1114
1115                 qmt_glb_write(env, th, lqe, 0, NULL);
1116 out_stop:
1117                 dt_trans_stop(env, qmt->qmt_child, th);
1118         }
1119 out:
1120         lqe->lqe_recalc_granted = 0;
1121         lqe_write_unlock(lqe);
1122
1123         return 0;
1124 }
1125
1126 #define MDT_DEV_NAME_LEN (LUSTRE_MAXFSNAME + sizeof("-MDT0000"))
1127 static struct obd_device *qmt_get_mgc(struct qmt_device *qmt)
1128 {
1129         char mdt_name[MDT_DEV_NAME_LEN];
1130         struct lustre_mount_info *lmi;
1131         struct obd_device *obd;
1132         int rc;
1133         ENTRY;
1134
1135         rc = server_name2fsname(qmt->qmt_svname, mdt_name, NULL);
1136         if (rc) {
1137                 CERROR("quota: cannot get server name from %s: rc = %d\n",
1138                        qmt->qmt_svname, rc);
1139                 RETURN(ERR_PTR(rc));
1140         }
1141
1142         strlcat(mdt_name, "-MDT0000", MDT_DEV_NAME_LEN);
1143         lmi = server_get_mount(mdt_name);
1144         if (lmi == NULL) {
1145                 rc = -ENOENT;
1146                 CERROR("%s: cannot get mount info from %s: rc = %d\n",
1147                        qmt->qmt_svname, mdt_name, rc);
1148                 RETURN(ERR_PTR(rc));
1149         }
1150         obd = s2lsi(lmi->lmi_sb)->lsi_mgc;
1151         lustre_put_lsi(lmi->lmi_sb);
1152
1153         RETURN(obd);
1154 }
1155
1156 static int qmt_pool_recalc(void *args)
1157 {
1158         struct qmt_pool_info *pool, *glbl_pool;
1159         struct rw_semaphore *sem = NULL;
1160         struct obd_device *obd;
1161         struct lu_env env;
1162         int i, rc, qtype, slaves_cnt;
1163         ENTRY;
1164
1165         pool = args;
1166
1167         obd = qmt_get_mgc(pool->qpi_qmt);
1168         if (IS_ERR(obd))
1169                 GOTO(out, rc = PTR_ERR(obd));
1170         else
1171                 /* Waiting for the end of processing mgs config.
1172                  * It is needed to be sure all pools are configured. */
1173                 while (obd->obd_process_conf)
1174                         schedule_timeout_uninterruptible(cfs_time_seconds(1));
1175
1176         sem = qmt_sarr_rwsem(pool);
1177         LASSERT(sem);
1178         down_read(sem);
1179         /* Hold this to be sure that OSTs from this pool
1180          * can't do acquire/release.
1181          *
1182          * I guess below write semaphore could be a bottleneck
1183          * as qmt_dqacq would be blocked trying to hold
1184          * read_lock at qmt_pool_lookup->qti_pools_add.
1185          * But on the other hand adding/removing OSTs to the pool is
1186          * a rare operation. If finally this would be a problem,
1187          * we can consider another approach. For example we can
1188          * iterate through the POOL's lqes. Take lqe, hold lqe_write_lock
1189          * and go through appropriate OSTs. I don't use this approach now
1190          * as newly created pool hasn't lqes entries. So firstly we need
1191          * to get this lqes from the global pool index file. This
1192          * solution looks more complex, so leave it as it is. */
1193         down_write(&pool->qpi_recalc_sem);
1194
1195         rc = lu_env_init(&env, LCT_MD_THREAD);
1196         if (rc) {
1197                 CERROR("%s: cannot init env: rc = %d\n", obd->obd_name, rc);
1198                 GOTO(out, rc);
1199         }
1200
1201         glbl_pool = qmt_pool_lookup_glb(&env, pool->qpi_qmt, pool->qpi_rtype);
1202         if (IS_ERR(glbl_pool))
1203                 GOTO(out_env, rc = PTR_ERR(glbl_pool));
1204
1205         slaves_cnt = qmt_sarr_count(pool);
1206         CDEBUG(D_QUOTA, "Starting pool recalculation for %d slaves in %s\n",
1207                slaves_cnt, pool->qpi_name);
1208
1209         for (qtype = 0; qtype < LL_MAXQUOTAS; qtype++) {
1210                 for (i = 0; i < slaves_cnt; i++) {
1211                         struct qmt_thread_info  *qti = qmt_info(&env);
1212                         struct dt_object *slv_obj;
1213                         struct obd_uuid uuid;
1214                         int idx;
1215
1216                         if (kthread_should_stop())
1217                                 GOTO(out_stop, rc = 0);
1218                         idx = qmt_sarr_get_idx(pool, i);
1219                         LASSERT(idx >= 0);
1220
1221                         /* We don't need fsname here - anyway
1222                          * lquota_disk_slv_filename ignores it. */
1223                         snprintf(uuid.uuid, UUID_MAX, "-OST%04x_UUID", idx);
1224                         lquota_generate_fid(&qti->qti_fid, pool->qpi_rtype,
1225                                             qtype);
1226                         /* look-up index file associated with acquiring slave */
1227                         slv_obj = lquota_disk_slv_find(&env,
1228                                                 glbl_pool->qpi_qmt->qmt_child,
1229                                                 glbl_pool->qpi_root,
1230                                                 &qti->qti_fid,
1231                                                 &uuid);
1232                         if (IS_ERR(slv_obj))
1233                                 GOTO(out_stop, rc = PTR_ERR(slv_obj));
1234
1235                         CDEBUG(D_QUOTA, "slv_obj is found %p for uuid %s\n",
1236                                slv_obj, uuid.uuid);
1237                         qmt_obj_recalc(&env, slv_obj, pool->qpi_site[qtype]);
1238                         dt_object_put(&env, slv_obj);
1239                 }
1240                 /* Now go trough the site hash and compare lqe_granted
1241                  * with lqe_calc_granted. Write new value if disagree */
1242
1243                 cfs_hash_for_each(pool->qpi_site[qtype]->lqs_hash,
1244                                   qmt_site_recalc_cb, &env);
1245         }
1246         GOTO(out_stop, rc);
1247 out_stop:
1248         qpi_putref(&env, glbl_pool);
1249 out_env:
1250         lu_env_fini(&env);
1251 out:
1252         if (xchg(&pool->qpi_recalc_task, NULL) == NULL)
1253                 /*
1254                  * Someone is waiting for us to stop - be sure not to exit
1255                  * before kthread_stop() gets a ref on the task.  No event
1256                  * will happen on 'pool, this is just a convenient way to
1257                  * wait.
1258                  */
1259                 wait_var_event(pool, kthread_should_stop());
1260
1261         clear_bit(QPI_FLAG_RECALC_OFFSET, &pool->qpi_flags);
1262         /* Pool can't be changed, since sem has been down.
1263          * Thus until up_read, no one can restart recalc thread. */
1264         if (sem) {
1265                 up_read(sem);
1266                 up_write(&pool->qpi_recalc_sem);
1267         }
1268         qpi_putref(&env, pool);
1269
1270         return rc;
1271 }
1272
1273 static int qmt_start_pool_recalc(struct lu_env *env, struct qmt_pool_info *qpi)
1274 {
1275         struct task_struct *task;
1276         int rc = 0;
1277
1278         if (!test_and_set_bit(QPI_FLAG_RECALC_OFFSET, &qpi->qpi_flags)) {
1279                 LASSERT(!qpi->qpi_recalc_task);
1280
1281                 qpi_getref(qpi);
1282                 task = kthread_create(qmt_pool_recalc, qpi,
1283                                       "qsd_reint_%s", qpi->qpi_name);
1284                 if (IS_ERR(task)) {
1285                         clear_bit(QPI_FLAG_RECALC_OFFSET, &qpi->qpi_flags);
1286                         rc = PTR_ERR(task);
1287                         qpi_putref(env, qpi);
1288                 } else {
1289                         qpi->qpi_recalc_task = task;
1290                         /* Using park/unpark to start the thread ensures that
1291                          * the thread function does get calls, so the
1292                          * ref on qpi will be dropped
1293                          */
1294                         kthread_park(task);
1295                         kthread_unpark(task);
1296                 }
1297         }
1298
1299         RETURN(rc);
1300 }
1301
1302 static inline void qmt_stop_pool_recalc(struct qmt_pool_info *qpi)
1303 {
1304         struct task_struct *task;
1305
1306         task = xchg(&qpi->qpi_recalc_task, NULL);
1307         if (task)
1308                 kthread_stop(task);
1309 }
1310
1311 static int qmt_pool_slv_nr_change(const struct lu_env *env,
1312                                   struct qmt_pool_info *pool,
1313                                   int idx, bool add)
1314 {
1315         struct qmt_pool_info *glbl_pool;
1316         int qtype;
1317
1318         glbl_pool = qmt_pool_lookup_glb(env, pool->qpi_qmt, LQUOTA_RES_DT);
1319         if (IS_ERR(glbl_pool))
1320                 RETURN(PTR_ERR(glbl_pool));
1321
1322         for (qtype = 0; qtype < LL_MAXQUOTAS; qtype++) {
1323                 struct qmt_thread_info  *qti = qmt_info(env);
1324                 struct dt_object *slv_obj;
1325                 struct obd_uuid uuid;
1326
1327                 /* We don't need fsname here - anyway
1328                  * lquota_disk_slv_filename ignores it. */
1329                 snprintf(uuid.uuid, UUID_MAX, "-OST%04x_UUID", idx);
1330                 lquota_generate_fid(&qti->qti_fid, pool->qpi_rtype,
1331                                     qtype);
1332                 /* look-up index file associated with acquiring slave */
1333                 slv_obj = lquota_disk_slv_find(env,
1334                                         glbl_pool->qpi_qmt->qmt_child,
1335                                         glbl_pool->qpi_root,
1336                                         &qti->qti_fid,
1337                                         &uuid);
1338                 if (IS_ERR(slv_obj))
1339                         continue;
1340
1341                 if (add)
1342                         pool->qpi_slv_nr[QMT_STYPE_OST][qtype]++;
1343                 else
1344                         pool->qpi_slv_nr[QMT_STYPE_OST][qtype]--;
1345                 dt_object_put(env, slv_obj);
1346         }
1347         qpi_putref(env, glbl_pool);
1348
1349         return 0;
1350 }
1351
1352 static int qmt_pool_add_rem(struct obd_device *obd, char *poolname,
1353                             char *slavename, bool add)
1354 {
1355         struct qmt_device       *qmt = lu2qmt_dev(obd->obd_lu_dev);
1356         struct qmt_pool_info    *qpi;
1357         struct lu_env            env;
1358         int                      rc, idx;
1359         ENTRY;
1360
1361         if (strnlen(poolname, LOV_MAXPOOLNAME + 1) > LOV_MAXPOOLNAME)
1362                 RETURN(-ENAMETOOLONG);
1363
1364         CDEBUG(D_QUOTA, add ? "%s: pool %s, adding %s\n" :
1365                               "%s: pool %s, removing %s\n",
1366               obd->obd_name, poolname, slavename);
1367
1368         rc = server_name2index(slavename, &idx, NULL);
1369         if (rc != LDD_F_SV_TYPE_OST)
1370                 RETURN(-EINVAL);
1371
1372         rc = lu_env_init(&env, LCT_MD_THREAD);
1373         if (rc) {
1374                 CERROR("%s: cannot init env: rc = %d\n", obd->obd_name, rc);
1375                 RETURN(rc);
1376         }
1377
1378         qpi = qmt_pool_lookup_name(&env, qmt, LQUOTA_RES_DT, poolname);
1379         if (IS_ERR(qpi)) {
1380                 CWARN("%s: can't find pool %s: rc = %long\n",
1381                       obd->obd_name, poolname, PTR_ERR(qpi));
1382                 GOTO(out, rc = PTR_ERR(qpi));
1383         }
1384
1385         rc = add ? qmt_sarr_pool_add(qpi, idx, 32) :
1386                    qmt_sarr_pool_rem(qpi, idx);
1387         if (rc) {
1388                 CERROR("%s: can't %s %s pool %s: rc = %d\n",
1389                        add ? "add to" : "remove", obd->obd_name,
1390                        slavename, poolname, rc);
1391                 GOTO(out_putref, rc);
1392         }
1393         qmt_pool_slv_nr_change(&env, qpi, idx, add);
1394         qmt_start_pool_recalc(&env, qpi);
1395
1396 out_putref:
1397         qpi_putref(&env, qpi);
1398 out:
1399         lu_env_fini(&env);
1400         RETURN(rc);
1401 }
1402
1403
1404
1405 /**
1406  * Add a single target device to the named pool.
1407  *
1408  * \param[in] obd       OBD device on which to add the pool
1409  * \param[in] poolname  name of the pool to which to add the target \a slavename
1410  * \param[in] slavename name of the target device to be added
1411  *
1412  * \retval              0 if \a slavename was (previously) added to the pool
1413  * \retval              negative error number on failure
1414  */
1415 int qmt_pool_add(struct obd_device *obd, char *poolname, char *slavename)
1416 {
1417         return qmt_pool_add_rem(obd, poolname, slavename, true);
1418 }
1419
1420 /**
1421  * Remove the named target from the specified pool.
1422  *
1423  * \param[in] obd       OBD device from which to remove \a poolname
1424  * \param[in] poolname  name of the pool to be changed
1425  * \param[in] slavename name of the target to remove from \a poolname
1426  *
1427  * \retval              0 on successfully removing \a slavename from the pool
1428  * \retval              negative number on error (e.g. \a slavename not in pool)
1429  */
1430 int qmt_pool_rem(struct obd_device *obd, char *poolname, char *slavename)
1431 {
1432         return qmt_pool_add_rem(obd, poolname, slavename, false);
1433 }
1434
1435 /**
1436  * Remove the named pool from the QMT device.
1437  *
1438  * \param[in] obd       OBD device on which pool was previously created
1439  * \param[in] poolname  name of pool to remove from \a obd
1440  *
1441  * \retval              0 on successfully removing the pool
1442  * \retval              negative error numbers for failures
1443  */
1444 int qmt_pool_del(struct obd_device *obd, char *poolname)
1445 {
1446         struct qmt_device       *qmt = lu2qmt_dev(obd->obd_lu_dev);
1447         struct qmt_pool_info    *qpi;
1448         struct lu_fid            fid;
1449         char                     buf[LQUOTA_NAME_MAX];
1450         struct lu_env            env;
1451         int                      rc;
1452         int                      qtype;
1453         ENTRY;
1454
1455         if (strnlen(poolname, LOV_MAXPOOLNAME + 1) > LOV_MAXPOOLNAME)
1456                 RETURN(-ENAMETOOLONG);
1457
1458         CDEBUG(D_QUOTA, "Removing quota pool "LOV_POOLNAMEF"\n",
1459                poolname);
1460
1461         rc = lu_env_init(&env, LCT_MD_THREAD);
1462         if (rc) {
1463                 CERROR("%s: cannot init env: rc = %d\n", obd->obd_name, rc);
1464                 RETURN(rc);
1465         }
1466
1467         /* look-up pool in charge of this global index FID */
1468         qpi = qmt_pool_lookup_name(&env, qmt, LQUOTA_RES_DT, poolname);
1469         if (IS_ERR(qpi)) {
1470                 /* Valid case for several MDTs at the same node -
1471                  * pool removed by the 1st MDT in config */
1472                 CDEBUG(D_QUOTA, "Cannot find pool %s\n", poolname);
1473                 lu_env_fini(&env);
1474                 RETURN(PTR_ERR(qpi));
1475         }
1476
1477         for (qtype = 0; qtype < LL_MAXQUOTAS; qtype++) {
1478                 lquota_generate_fid(&fid, LQUOTA_RES_DT, qtype);
1479                 snprintf(buf, LQUOTA_NAME_MAX, "0x%x", fid.f_oid);
1480                 rc = local_object_unlink(&env, qmt->qmt_child,
1481                                          qpi->qpi_root, buf);
1482                 if (rc)
1483                         CWARN("%s: cannot unlink %s from pool %s: rc = %d\n",
1484                               obd->obd_name, buf, poolname, rc);
1485         }
1486
1487         /* put ref from look-up */
1488         qpi_putref(&env, qpi);
1489         /* put last ref to free qpi */
1490         qpi_putref(&env, qpi);
1491
1492         snprintf(buf, LQUOTA_NAME_MAX, "%s-%s",
1493                  RES_NAME(LQUOTA_RES_DT), poolname);
1494         rc = local_object_unlink(&env, qmt->qmt_child, qmt->qmt_root, buf);
1495         if (rc)
1496                 CWARN("%s: cannot unlink dir %s: rc = %d\n",
1497                       obd->obd_name, poolname, rc);
1498
1499         lu_env_fini(&env);
1500         RETURN(0);
1501 }
1502
1503 static inline int qmt_sarr_pool_init(struct qmt_pool_info *qpi)
1504 {
1505
1506         /* No need to initialize sarray for global pool
1507          * as it always includes all slaves */
1508         if (qmt_pool_global(qpi))
1509                 return 0;
1510
1511         switch (qpi->qpi_rtype) {
1512         case LQUOTA_RES_DT:
1513                 return lu_tgt_pool_init(&qpi->qpi_sarr.osts, 0);
1514         case LQUOTA_RES_MD:
1515         default:
1516                 return 0;
1517         }
1518 }
1519
1520 static inline int qmt_sarr_pool_add(struct qmt_pool_info *qpi, int idx, int min)
1521 {
1522         switch (qpi->qpi_rtype) {
1523         case LQUOTA_RES_DT:
1524                 return lu_tgt_pool_add(&qpi->qpi_sarr.osts, idx, min);
1525         case LQUOTA_RES_MD:
1526         default:
1527                 return 0;
1528         }
1529 }
1530
1531 static inline int qmt_sarr_pool_rem(struct qmt_pool_info *qpi, int idx)
1532 {
1533         switch (qpi->qpi_rtype) {
1534         case LQUOTA_RES_DT:
1535                 return lu_tgt_pool_remove(&qpi->qpi_sarr.osts, idx);
1536         case LQUOTA_RES_MD:
1537         default:
1538                 return 0;
1539         }
1540 }
1541
1542 static inline int qmt_sarr_pool_free(struct qmt_pool_info *qpi)
1543 {
1544         if (qmt_pool_global(qpi))
1545                 return 0;
1546
1547         switch (qpi->qpi_rtype) {
1548         case LQUOTA_RES_DT:
1549                 if (!qpi->qpi_sarr.osts.op_array)
1550                         return 0;
1551                 return lu_tgt_pool_free(&qpi->qpi_sarr.osts);
1552         case LQUOTA_RES_MD:
1553         default:
1554                 return 0;
1555         }
1556 }
1557
1558 static inline int qmt_sarr_check_idx(struct qmt_pool_info *qpi, int idx)
1559 {
1560         if (qmt_pool_global(qpi))
1561                 return 0;
1562
1563         switch (qpi->qpi_rtype) {
1564         case LQUOTA_RES_DT:
1565                 return lu_tgt_check_index(idx, &qpi->qpi_sarr.osts);
1566         case LQUOTA_RES_MD:
1567         default:
1568                 return 0;
1569         }
1570 }
1571
1572 struct rw_semaphore *qmt_sarr_rwsem(struct qmt_pool_info *qpi)
1573 {
1574         switch (qpi->qpi_rtype) {
1575         case LQUOTA_RES_DT:
1576                 /* to protect ost_pool use */
1577                 return &qpi->qpi_sarr.osts.op_rw_sem;
1578         case LQUOTA_RES_MD:
1579         default:
1580                 return NULL;
1581         }
1582 }
1583
1584 int qmt_sarr_get_idx(struct qmt_pool_info *qpi, int arr_idx)
1585 {
1586
1587         if (qmt_pool_global(qpi))
1588                 return arr_idx;
1589
1590         switch (qpi->qpi_rtype) {
1591         case LQUOTA_RES_DT:
1592                 LASSERTF(arr_idx < qpi->qpi_sarr.osts.op_count && arr_idx >= 0,
1593                          "idx invalid %d op_count %d\n", arr_idx,
1594                          qpi->qpi_sarr.osts.op_count);
1595                 return qpi->qpi_sarr.osts.op_array[arr_idx];
1596         case LQUOTA_RES_MD:
1597         default:
1598                 return -EINVAL;
1599         }
1600 }
1601
1602 /* Number of slaves in a pool */
1603 unsigned int qmt_sarr_count(struct qmt_pool_info *qpi)
1604 {
1605         switch (qpi->qpi_rtype) {
1606         case LQUOTA_RES_DT:
1607                 return qpi->qpi_sarr.osts.op_count;
1608         case LQUOTA_RES_MD:
1609         default:
1610                 return -EINVAL;
1611         }
1612 }