Whamcloud - gitweb
LU-15785 tests: do not detect versions for RPC_MODE mode
[fs/lustre-release.git] / lustre / quota / qmt_pool.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 021110-1307, USA
20  *
21  * GPL HEADER END
22  */
23 /*
24  * Copyright (c) 2012, 2017, Intel Corporation.
25  * Use is subject to license terms.
26  *
27  * Author: Johann Lombardi <johann.lombardi@intel.com>
28  * Author: Niu    Yawei    <yawei.niu@intel.com>
29  */
30
31 /*
32  * A Quota Master Target has a list(qmt_pool_list) where it stores qmt_pool_info
33  * structures. There is one such structure for each pool managed by the QMT.
34  *
35  * Each pool can have different quota types enforced (typically user & group
36  * quota). A pool is in charge of managing lquota_entry structures for each
37  * quota type. This is done by creating one lquota_entry site per quota
38  * type. A site stores entries in a hash table and read quota settings from disk
39  * when a given ID isn't present in the hash.
40  *
41  * The pool API exported here is the following:
42  * - qmt_pool_init(): initializes the general QMT structures used to manage
43  *                    pools.
44  * - qmt_pool_fini(): frees the structures allocated by qmt_pool_fini().
45  * - qmt_pool_prepare(): sets up the on-disk indexes associated with each pool.
46  * - qmt_pool_new_conn(): is used to create a new slave index file.
47  * - qmt_pool_lqe_lookup(): returns an up-to-date lquota entry associated with
48  *                          a given ID.
49  */
50
51 #define DEBUG_SUBSYSTEM S_LQUOTA
52
53 #include <obd_class.h>
54 #include <lprocfs_status.h>
55 #include "qmt_internal.h"
56
57 static inline int qmt_sarr_pool_init(struct qmt_pool_info *qpi);
58 static inline int qmt_sarr_pool_add(struct qmt_pool_info *qpi,
59                                     int idx, int min);
60 static inline int qmt_sarr_pool_rem(struct qmt_pool_info *qpi, int idx);
61 static inline void qmt_sarr_pool_free(struct qmt_pool_info *qpi);
62 static inline int qmt_sarr_check_idx(struct qmt_pool_info *qpi, int idx);
63 static inline void qmt_stop_pool_recalc(struct qmt_pool_info *qpi);
64
65 /*
66  * Static helper functions not used outside the scope of this file
67  */
68
69 static inline void qpi_putref_locked(struct qmt_pool_info *pool)
70 {
71         LASSERT(atomic_read(&pool->qpi_ref) > 1);
72         atomic_dec(&pool->qpi_ref);
73 }
74
75 /* some procfs helpers */
76 static int qpi_state_seq_show(struct seq_file *m, void *data)
77 {
78         struct qmt_pool_info    *pool = m->private;
79         int                      type;
80
81         LASSERT(pool != NULL);
82         if (unlikely(!test_bit(QPI_FLAG_STATE_INITED, &pool->qpi_flags)))
83                 return -ENOENT;
84
85         seq_printf(m, "pool:\n"
86                    "    id: %u\n"
87                    "    type: %s\n"
88                    "    ref: %d\n"
89                    "    least qunit: %lu\n",
90                    0,
91                    RES_NAME(pool->qpi_rtype),
92                    atomic_read(&pool->qpi_ref),
93                    pool->qpi_least_qunit);
94
95         for (type = 0; type < LL_MAXQUOTAS; type++)
96                 seq_printf(m, "    %s:\n"
97                            "        #slv: %d\n"
98                            "        #lqe: %d\n",
99                            qtype_name(type),
100                            qpi_slv_nr(pool, type),
101                     atomic_read(&pool->qpi_site[type]->lqs_hash->hs_count));
102
103         return 0;
104 }
105 LPROC_SEQ_FOPS_RO(qpi_state);
106
107 static int qpi_soft_least_qunit_seq_show(struct seq_file *m, void *data)
108 {
109         struct qmt_pool_info    *pool = m->private;
110         LASSERT(pool != NULL);
111         if (unlikely(!test_bit(QPI_FLAG_STATE_INITED, &pool->qpi_flags)))
112                 return -ENOENT;
113
114         seq_printf(m, "%lu\n", pool->qpi_soft_least_qunit);
115         return 0;
116 }
117
118 static ssize_t
119 qpi_soft_least_qunit_seq_write(struct file *file, const char __user *buffer,
120                                size_t count, loff_t *off)
121 {
122         struct seq_file *m = file->private_data;
123         struct qmt_pool_info *pool = m->private;
124         long long least_qunit, qunit;
125         int rc;
126
127         LASSERT(pool != NULL);
128         if (unlikely(!test_bit(QPI_FLAG_STATE_INITED, &pool->qpi_flags)))
129                 return -ENOENT;
130
131         /* Not tuneable for inode limit */
132         if (pool->qpi_rtype != LQUOTA_RES_DT)
133                 return -EINVAL;
134
135         rc = kstrtoll_from_user(buffer, count, 0, &least_qunit);
136         if (rc)
137                 return rc;
138
139         /* Miminal qpi_soft_least_qunit */
140         qunit = pool->qpi_least_qunit << 2;
141         /* The value must be power of miminal qpi_soft_least_qunit, see
142          * how the qunit is adjusted in qmt_adjust_qunit(). */
143         while (qunit > 0 && qunit < least_qunit)
144                 qunit <<= 2;
145         if (qunit <= 0)
146                 qunit = INT_MAX & ~3;
147
148         pool->qpi_soft_least_qunit = qunit;
149         return count;
150 }
151 LPROC_SEQ_FOPS(qpi_soft_least_qunit);
152
153 static struct lprocfs_vars lprocfs_quota_qpi_vars[] = {
154         { .name =       "info",
155           .fops =       &qpi_state_fops },
156         { .name =       "soft_least_qunit",
157           .fops =       &qpi_soft_least_qunit_fops },
158         { NULL }
159 };
160
161 /*
162  * Allocate a new qmt_pool_info structure and add it to qmt_pool_list.
163  *
164  * \param env       - is the environment passed by the caller
165  * \param qmt       - is the quota master target
166  * \param pool_type - is the resource type of this pool instance, either
167  *                    LQUOTA_RES_MD or LQUOTA_RES_DT.
168  *
169  * \retval - 0 on success, appropriate error on failure
170  */
171 static int qmt_pool_alloc(const struct lu_env *env, struct qmt_device *qmt,
172                           char *pool_name, int pool_type)
173 {
174         struct qmt_thread_info  *qti = qmt_info(env);
175         struct qmt_pool_info    *pool;
176         int                      rc = 0;
177         ENTRY;
178
179         OBD_ALLOC_PTR(pool);
180         if (pool == NULL)
181                 RETURN(-ENOMEM);
182         INIT_LIST_HEAD(&pool->qpi_linkage);
183         init_rwsem(&pool->qpi_recalc_sem);
184
185         pool->qpi_rtype = pool_type;
186         pool->qpi_flags = 0;
187
188         /* initialize refcount to 1, hash table will then grab an additional
189          * reference */
190         atomic_set(&pool->qpi_ref, 1);
191
192         /* set up least qunit size to use for this pool */
193         pool->qpi_least_qunit = LQUOTA_LEAST_QUNIT(pool_type);
194         if (pool_type == LQUOTA_RES_DT)
195                 pool->qpi_soft_least_qunit = pool->qpi_least_qunit << 2;
196         else
197                 pool->qpi_soft_least_qunit = pool->qpi_least_qunit;
198
199         /* grab reference on master target that this pool belongs to */
200         lu_device_get(qmt2lu_dev(qmt));
201         lu_ref_add(&qmt2lu_dev(qmt)->ld_reference, "pool", pool);
202         pool->qpi_qmt = qmt;
203
204         /* create pool proc directory */
205         snprintf(qti->qti_buf, LQUOTA_NAME_MAX, "%s-%s",
206                  RES_NAME(pool_type), pool_name);
207         strncpy(pool->qpi_name, pool_name, QPI_MAXNAME);
208         pool->qpi_proc = lprocfs_register(qti->qti_buf, qmt->qmt_proc,
209                                           lprocfs_quota_qpi_vars, pool);
210         if (IS_ERR(pool->qpi_proc)) {
211                 rc = PTR_ERR(pool->qpi_proc);
212                 CERROR("%s: failed to create proc entry for pool %s (%d)\n",
213                        qmt->qmt_svname, qti->qti_buf, rc);
214                 pool->qpi_proc = NULL;
215                 GOTO(out, rc);
216         }
217
218         rc = qmt_sarr_pool_init(pool);
219         if (rc)
220                 GOTO(out, rc);
221
222         /* add to qmt pool list */
223         down_write(&qmt->qmt_pool_lock);
224         list_add_tail(&pool->qpi_linkage, &qmt->qmt_pool_list);
225         up_write(&qmt->qmt_pool_lock);
226         EXIT;
227 out:
228         if (rc)
229                 /* this frees the pool structure since refcount is equal to 1 */
230                 qpi_putref(env, pool);
231         return rc;
232 }
233
234 /*
235  * Delete a qmt_pool_info instance and all structures associated.
236  *
237  * \param env  - is the environment passed by the caller
238  * \param pool - is the qmt_pool_info structure to free
239  */
240 void qmt_pool_free(const struct lu_env *env, struct qmt_pool_info *pool)
241 {
242         struct  qmt_device *qmt = pool->qpi_qmt;
243         int     qtype;
244         ENTRY;
245
246         /* remove from list */
247         down_write(&qmt->qmt_pool_lock);
248         list_del_init(&pool->qpi_linkage);
249         up_write(&qmt->qmt_pool_lock);
250
251         if (atomic_read(&pool->qpi_ref) > 0)
252                 RETURN_EXIT;
253
254         qmt_stop_pool_recalc(pool);
255         qmt_sarr_pool_free(pool);
256
257         /* release proc entry */
258         if (pool->qpi_proc) {
259                 lprocfs_remove(&pool->qpi_proc);
260                 pool->qpi_proc = NULL;
261         }
262
263         /* release per-quota type site used to manage quota entries as well as
264          * references to global index files */
265         for (qtype = 0; qtype < LL_MAXQUOTAS; qtype++) {
266                 /* release lqe storing grace time */
267                 if (pool->qpi_grace_lqe[qtype] != NULL)
268                         lqe_putref(pool->qpi_grace_lqe[qtype]);
269
270                 /* release site */
271                 if (pool->qpi_site[qtype] != NULL &&
272                     !IS_ERR(pool->qpi_site[qtype]))
273                         lquota_site_free(env, pool->qpi_site[qtype]);
274                 /* release reference to global index */
275                 if (pool->qpi_glb_obj[qtype] != NULL &&
276                     !IS_ERR(pool->qpi_glb_obj[qtype]))
277                         dt_object_put(env, pool->qpi_glb_obj[qtype]);
278         }
279
280         /* release reference on pool directory */
281         if (pool->qpi_root != NULL && !IS_ERR(pool->qpi_root))
282                 dt_object_put(env, pool->qpi_root);
283
284         /* release reference on the master target */
285         if (pool->qpi_qmt != NULL) {
286                 struct lu_device *ld = qmt2lu_dev(pool->qpi_qmt);
287
288                 lu_ref_del(&ld->ld_reference, "pool", pool);
289                 lu_device_put(ld);
290                 pool->qpi_qmt = NULL;
291         }
292
293         LASSERT(list_empty(&pool->qpi_linkage));
294         OBD_FREE_PTR(pool);
295 }
296
297 static inline void qti_pools_init(const struct lu_env *env)
298 {
299         struct qmt_thread_info  *qti = qmt_info(env);
300
301         qti->qti_pools_cnt = 0;
302         qti->qti_pools_num = QMT_MAX_POOL_NUM;
303 }
304
305 #define qti_pools(qti)  (qti->qti_pools_num > QMT_MAX_POOL_NUM ? \
306                                 qti->qti_pools : qti->qti_pools_small)
307 #define qti_pools_env(env) \
308         (qmt_info(env)->qti_pools_num > QMT_MAX_POOL_NUM ? \
309                 qmt_info(env)->qti_pools : qmt_info(env)->qti_pools_small)
310 #define qti_pools_cnt(env)      (qmt_info(env)->qti_pools_cnt)
311
312 static inline int qti_pools_add(const struct lu_env *env,
313                                 struct qmt_pool_info *qpi)
314 {
315         struct qmt_thread_info  *qti = qmt_info(env);
316         struct qmt_pool_info    **pools = qti->qti_pools;
317
318         pools = qti_pools(qti);
319         LASSERTF(qti->qti_pools_num >= QMT_MAX_POOL_NUM,
320                  "Forgot init? %p\n", qti);
321
322         if (qti->qti_pools_cnt >= qti->qti_pools_num) {
323                 OBD_ALLOC(pools, sizeof(qpi) * qti->qti_pools_num * 2);
324                 if (!pools)
325                         return -ENOMEM;
326                 memcpy(pools, qti_pools(qti), qti->qti_pools_cnt * sizeof(qpi));
327                 /* Don't need to free, if it is the very 1st allocation */
328                 if (qti->qti_pools_num > QMT_MAX_POOL_NUM)
329                         OBD_FREE(qti->qti_pools,
330                                  qti->qti_pools_num * sizeof(qpi));
331                 qti->qti_pools = pools;
332                 qti->qti_pools_num *= 2;
333         }
334
335         qpi_getref(qpi);
336         /* Take this to protect pool's lqes against changing by
337          * recalculation thread. This would be unlocked at
338          * qti_pools_fini. */
339         down_read(&qpi->qpi_recalc_sem);
340         if (qmt_pool_global(qpi) && qti_pools_cnt(env) > 0) {
341                 pools[qti->qti_pools_cnt++] = pools[0];
342                 /* Store global pool always at index 0 */
343                 pools[0] = qpi;
344         } else {
345                 pools[qti->qti_pools_cnt++] = qpi;
346         }
347
348         CDEBUG(D_QUOTA, "Pool %s is added, pools %p qti_pools %p pool_num %d\n",
349                qpi->qpi_name, pools, qti->qti_pools, qti->qti_pools_cnt);
350
351         return 0;
352 }
353
354 static inline void qti_pools_fini(const struct lu_env *env)
355 {
356         struct qmt_thread_info  *qti = qmt_info(env);
357         struct qmt_pool_info    **pools = qti->qti_pools;
358         int i;
359
360         LASSERT(qti->qti_pools_cnt > 0);
361
362         pools = qti_pools(qti);
363         for (i = 0; i < qti->qti_pools_cnt; i++) {
364                 up_read(&pools[i]->qpi_recalc_sem);
365                 qpi_putref(env, pools[i]);
366         }
367
368         if (qti->qti_pools_num > QMT_MAX_POOL_NUM)
369                 OBD_FREE(qti->qti_pools,
370                          qti->qti_pools_num * sizeof(struct qmt_pool_info *));
371 }
372
373 /*
374  * Look-up a pool in a list based on the type.
375  *
376  * \param env   - is the environment passed by the caller
377  * \param qmt   - is the quota master target
378  * \param rtype - is the type of this pool, either LQUOTA_RES_MD or
379  *                    LQUOTA_RES_DT.
380  * \param pool_name - is the pool name to search for
381  * \param idx   - OST or MDT index to search for. When it is >= 0, function
382  *              returns array with pointers to all pools that include
383  *              targets with requested index.
384  * \param add   - add to qti_pool_arr if true
385  */
386 struct qmt_pool_info *qmt_pool_lookup(const struct lu_env *env,
387                                              struct qmt_device *qmt,
388                                              int rtype,
389                                              char *pool_name,
390                                              int idx, bool add)
391 {
392         struct qmt_pool_info    *pos, *pool;
393         int rc = 0;
394         ENTRY;
395
396         down_read(&qmt->qmt_pool_lock);
397         if (list_empty(&qmt->qmt_pool_list)) {
398                 up_read(&qmt->qmt_pool_lock);
399                 RETURN(ERR_PTR(-ENOENT));
400         }
401
402         CDEBUG(D_QUOTA, "type %d name %s index %d\n",
403                rtype, pool_name ?: "<none>", idx);
404         /* Now just find a pool with correct type in a list. Further we need
405          * to go through the list and find a pool that includes requested OST
406          * or MDT. Possibly this would return a list of pools that includes
407          * needed target(OST/MDT). */
408         pool = NULL;
409         if (idx == -1 && !pool_name)
410                 pool_name = GLB_POOL_NAME;
411
412         list_for_each_entry(pos, &qmt->qmt_pool_list, qpi_linkage) {
413                 if (pos->qpi_rtype != rtype)
414                         continue;
415
416                 if (idx >= 0 && !qmt_sarr_check_idx(pos, idx)) {
417                         rc = qti_pools_add(env, pos);
418                         if (rc)
419                                 break;
420                         continue;
421                 }
422
423                 if (pool_name && !strncmp(pool_name, pos->qpi_name,
424                                           LOV_MAXPOOLNAME)) {
425                         pool = pos;
426                         if (add) {
427                                 rc = qti_pools_add(env, pos);
428                                 if (rc)
429                                         break;
430                         } else {
431                                 qpi_getref(pool);
432                         }
433                         break;
434                 }
435         }
436         up_read(&qmt->qmt_pool_lock);
437
438         if (rc)
439                 GOTO(out_err, rc);
440
441         if (idx >= 0 && qti_pools_cnt(env))
442                 pool = qti_pools_env(env)[0];
443
444         RETURN(pool ? : ERR_PTR(-ENOENT));
445 out_err:
446         CERROR("%s: cannot add pool %s: err = %d\n",
447                 qmt->qmt_svname, pos->qpi_name, rc);
448         return ERR_PTR(rc);
449 }
450
451 /*
452  * Functions implementing the pool API, used by the qmt handlers
453  */
454
455 /*
456  * Destroy all pools which are still in the pool list.
457  *
458  * \param env - is the environment passed by the caller
459  * \param qmt - is the quota master target
460  *
461  */
462 void qmt_pool_fini(const struct lu_env *env, struct qmt_device *qmt)
463 {
464         struct qmt_pool_info *pool, *tmp;
465         ENTRY;
466
467         /* parse list of pool and destroy each element */
468         list_for_each_entry_safe(pool, tmp, &qmt->qmt_pool_list, qpi_linkage) {
469                 /* stop all recalc threads - it may hold qpi reference */
470                 qmt_stop_pool_recalc(pool);
471                 /* release extra reference taken in qmt_pool_alloc */
472                 qpi_putref(env, pool);
473         }
474         LASSERT(list_empty(&qmt->qmt_pool_list));
475
476         EXIT;
477 }
478
479 /*
480  * Initialize pool configure for the quota master target. For now, we only
481  * support the default data (i.e. all OSTs) and metadata (i.e. all the MDTs)
482  * pool which are instantiated in this function.
483  *
484  * \param env - is the environment passed by the caller
485  * \param qmt - is the quota master target for which we have to initialize the
486  *              pool configuration
487  *
488  * \retval - 0 on success, appropriate error on failure
489  */
490 int qmt_pool_init(const struct lu_env *env, struct qmt_device *qmt)
491 {
492         int     res, rc = 0;
493         ENTRY;
494
495         INIT_LIST_HEAD(&qmt->qmt_pool_list);
496         init_rwsem(&qmt->qmt_pool_lock);
497
498         /* Instantiate pool master for the default data and metadata pool.
499          * This code will have to be revisited once we support quota on
500          * non-default pools */
501         for (res = LQUOTA_FIRST_RES; res < LQUOTA_LAST_RES; res++) {
502                 rc = qmt_pool_alloc(env, qmt, GLB_POOL_NAME, res);
503                 if (rc)
504                         break;
505         }
506
507         if (rc)
508                 qmt_pool_fini(env, qmt);
509
510         RETURN(rc);
511 }
512
513 static int qmt_slv_cnt(const struct lu_env *env, struct lu_fid *glb_fid,
514                        char *slv_name, struct lu_fid *slv_fid, void *arg)
515 {
516         struct obd_uuid uuid;
517         int (*nr)[QMT_STYPE_CNT][LL_MAXQUOTAS] = arg;
518         int stype, qtype;
519         int rc;
520
521         rc = lquota_extract_fid(glb_fid, NULL, &qtype);
522         LASSERT(!rc);
523
524         obd_str2uuid(&uuid, slv_name);
525         stype = qmt_uuid2idx(&uuid, NULL);
526         if (stype < 0)
527                 return stype;
528         /* one more slave */
529         (*nr)[stype][qtype]++;
530         CDEBUG(D_QUOTA, "slv_name %s stype %d qtype %d nr %d\n",
531                         slv_name, stype, qtype, (*nr)[stype][qtype]);
532
533         return 0;
534 }
535
536 /*
537  * Set up on-disk index files associated with each pool.
538  *
539  * \param env - is the environment passed by the caller
540  * \param qmt - is the quota master target for which we have to initialize the
541  *              pool configuration
542  * \param qmt_root - is the on-disk directory created for the QMT.
543  * \param name - is the pool name that we need to setup. Setup all pools
544  *               in qmt_pool_list when name is NULL.
545  *
546  * \retval - 0 on success, appropriate error on failure
547  */
548 int qmt_pool_prepare(const struct lu_env *env, struct qmt_device *qmt,
549                      struct dt_object *qmt_root, char *name)
550 {
551         struct qmt_thread_info  *qti = qmt_info(env);
552         struct lquota_glb_rec   *rec = &qti->qti_glb_rec;
553         struct qmt_pool_info    *pool;
554         struct dt_device        *dev = NULL;
555         dt_obj_version_t         version;
556         struct list_head        *pos;
557         int                      rc = 0, i, qtype;
558         ENTRY;
559
560         /* iterate over each pool in the list and allocate a quota site for each
561          * one. This involves creating a global index file on disk */
562         list_for_each(pos, &qmt->qmt_pool_list) {
563                 struct dt_object        *obj;
564                 struct lquota_entry     *lqe;
565                 char                    *pool_name;
566                 int                      rtype;
567
568                 pool = list_entry(pos, struct qmt_pool_info,
569                                   qpi_linkage);
570
571                 pool_name = pool->qpi_name;
572                 if (name && strncmp(pool_name, name, LOV_MAXPOOLNAME))
573                         continue;
574                 rtype = pool->qpi_rtype;
575                 if (dev == NULL)
576                         dev = pool->qpi_qmt->qmt_child;
577
578                 /* allocate directory for this pool */
579                 snprintf(qti->qti_buf, LQUOTA_NAME_MAX, "%s-%s",
580                          RES_NAME(rtype), pool_name);
581                 obj = lquota_disk_dir_find_create(env, qmt->qmt_child, qmt_root,
582                                                   qti->qti_buf);
583                 if (IS_ERR(obj))
584                         RETURN(PTR_ERR(obj));
585                 pool->qpi_root = obj;
586
587                 for (qtype = 0; qtype < LL_MAXQUOTAS; qtype++) {
588                         /* Generating FID of global index in charge of storing
589                          * settings for this quota type */
590                         lquota_generate_fid(&qti->qti_fid, rtype, qtype);
591
592                         /* open/create the global index file for this quota
593                          * type. If name is set, it means we came here from
594                          * qmt_pool_new and can create glb index with a
595                          * local generated FID. */
596                         obj = lquota_disk_glb_find_create(env, dev,
597                                                           pool->qpi_root,
598                                                           &qti->qti_fid,
599                                                           name ? true : false);
600                         if (IS_ERR(obj)) {
601                                 rc = PTR_ERR(obj);
602                                 CERROR("%s: failed to create glb index copy for %s type: rc = %d\n",
603                                        qmt->qmt_svname, qtype_name(qtype), rc);
604                                 RETURN(rc);
605                         }
606
607                         pool->qpi_glb_obj[qtype] = obj;
608
609                         version = dt_version_get(env, obj);
610                         /* set default grace time for newly created index */
611                         if (version == 0) {
612                                 rec->qbr_hardlimit = 0;
613                                 rec->qbr_softlimit = 0;
614                                 rec->qbr_granted = 0;
615                                 rec->qbr_time = rtype == LQUOTA_RES_MD ?
616                                         MAX_IQ_TIME : MAX_DQ_TIME;
617
618                                 rc = lquota_disk_write_glb(env, obj, 0, rec);
619                                 if (rc) {
620                                         CERROR("%s: failed to set default grace time for %s type: rc = %d\n",
621                                                qmt->qmt_svname, qtype_name(qtype), rc);
622                                         RETURN(rc);
623                                 }
624
625                                 rc = lquota_disk_update_ver(env, dev, obj, 1);
626                                 if (rc) {
627                                         CERROR("%s: failed to set initial version for %s type: rc = %d\n",
628                                                qmt->qmt_svname, qtype_name(qtype), rc);
629                                         RETURN(rc);
630                                 }
631                         }
632
633                         /* create quota entry site for this quota type */
634                         pool->qpi_site[qtype] = lquota_site_alloc(env, pool,
635                                                                   true, qtype,
636                                                                   &qmt_lqe_ops);
637                         if (IS_ERR(pool->qpi_site[qtype])) {
638                                 rc = PTR_ERR(pool->qpi_site[qtype]);
639                                 CERROR("%s: failed to create site for %s type: rc = %d\n",
640                                        qmt->qmt_svname, qtype_name(qtype), rc);
641                                 RETURN(rc);
642                         }
643
644                         /* count number of slaves which already connected to
645                          * the master in the past */
646                         for (i = 0; i < QMT_STYPE_CNT; i++)
647                                 pool->qpi_slv_nr[i][qtype] = 0;
648
649                         rc = lquota_disk_for_each_slv(env, pool->qpi_root,
650                                                       &qti->qti_fid,
651                                                       qmt_slv_cnt,
652                                                       &pool->qpi_slv_nr);
653                         if (rc) {
654                                 CERROR("%s: failed to scan & count slave indexes for %s type: rc = %d\n",
655                                        qmt->qmt_svname, qtype_name(qtype), rc);
656                                 RETURN(rc);
657                         }
658
659                         /* Global grace time is stored in quota settings of
660                          * ID 0. */
661                         qti->qti_id.qid_uid = 0;
662
663                         /* look-up quota entry storing grace time */
664                         lqe = lqe_locate(env, pool->qpi_site[qtype],
665                                          &qti->qti_id);
666                         if (IS_ERR(lqe))
667                                 RETURN(PTR_ERR(lqe));
668                         pool->qpi_grace_lqe[qtype] = lqe;
669 #ifdef CONFIG_PROC_FS
670                         /* add procfs file to dump the global index, mostly for
671                          * debugging purpose */
672                         snprintf(qti->qti_buf, MTI_NAME_MAXLEN,
673                                  "glb-%s", qtype_name(qtype));
674                         rc = lprocfs_seq_create(pool->qpi_proc, qti->qti_buf,
675                                                 0444, &lprocfs_quota_seq_fops,
676                                                 obj);
677                         if (rc)
678                                 CWARN("%s: Error adding procfs file for global quota index "DFID": rc = %d\n",
679                                       qmt->qmt_svname, PFID(&qti->qti_fid), rc);
680 #endif
681                 }
682                 set_bit(QPI_FLAG_STATE_INITED, &pool->qpi_flags);
683                 if (name)
684                         break;
685         }
686
687         RETURN(0);
688 }
689
690 /*
691  * Handle new slave connection. Called when a slave enqueues the global quota
692  * lock at the beginning of the reintegration procedure.
693  *
694  * \param env - is the environment passed by the caller
695  * \parap qmt - is the quota master target handling this request
696  * \param glb_fid - is the fid of the global index file
697  * \param slv_fid - is the fid of the newly created slave index file
698  * \param slv_ver - is the current version of the slave index file
699  * \param uuid    - is the uuid of slave which is (re)connecting to the master
700  *                  target
701  *
702  * \retval - 0 on success, appropriate error on failure
703  */
704 int qmt_pool_new_conn(const struct lu_env *env, struct qmt_device *qmt,
705                       struct lu_fid *glb_fid, struct lu_fid *slv_fid,
706                       __u64 *slv_ver, struct obd_uuid *uuid)
707 {
708         struct qmt_pool_info    *pool;
709         struct dt_object        *slv_obj;
710         int                      pool_type, qtype, stype;
711         bool                     created = false;
712         int                      idx, i, rc = 0;
713
714         stype = qmt_uuid2idx(uuid, &idx);
715         if (stype < 0)
716                 RETURN(stype);
717
718         /* extract pool info from global index FID */
719         rc = lquota_extract_fid(glb_fid, &pool_type, &qtype);
720         if (rc)
721                 RETURN(rc);
722
723         /* look-up pool in charge of this global index FID */
724         qti_pools_init(env);
725         pool = qmt_pool_lookup_arr(env, qmt, pool_type, idx);
726         if (IS_ERR(pool))
727                 RETURN(PTR_ERR(pool));
728
729         /* look-up slave index file */
730         slv_obj = lquota_disk_slv_find(env, qmt->qmt_child, pool->qpi_root,
731                                        glb_fid, uuid);
732         if (IS_ERR(slv_obj) && PTR_ERR(slv_obj) == -ENOENT) {
733                 /* create slave index file */
734                 slv_obj = lquota_disk_slv_find_create(env, qmt->qmt_child,
735                                                       pool->qpi_root, glb_fid,
736                                                       uuid, false);
737                 created = true;
738         }
739         if (IS_ERR(slv_obj)) {
740                 rc = PTR_ERR(slv_obj);
741                 CERROR("%s: failed to create quota slave index file for %s (%d)"
742                        "\n", qmt->qmt_svname, obd_uuid2str(uuid), rc);
743                 GOTO(out, rc);
744         }
745
746         /* retrieve slave fid & current object version */
747         memcpy(slv_fid, lu_object_fid(&slv_obj->do_lu), sizeof(*slv_fid));
748         *slv_ver = dt_version_get(env, slv_obj);
749         dt_object_put(env, slv_obj);
750         if (created)
751                 for (i = 0; i < qti_pools_cnt(env); i++)
752                         qti_pools_env(env)[i]->qpi_slv_nr[stype][qtype]++;
753 out:
754         qti_pools_fini(env);
755         RETURN(rc);
756 }
757
758 /*
759  * Look-up a lquota_entry in the pool hash and allocate it if not found.
760  *
761  * \param env - is the environment passed by the caller
762  * \param qmt - is the quota master target for which we have to initialize the
763  *              pool configuration
764  * \param pool_type - is the pool type, either LQUOTA_RES_MD or LQUOTA_RES_DT.
765  * \param qtype     - is the quota type, either user or group.
766  * \param qid       - is the quota ID to look-up
767  *
768  * \retval - valid pointer to lquota entry on success, appropriate error on
769  *           failure
770  */
771 struct lquota_entry *qmt_pool_lqe_lookup(const struct lu_env *env,
772                                          struct qmt_device *qmt,
773                                          int pool_type, int qtype,
774                                          union lquota_id *qid,
775                                          char *pool_name)
776 {
777         struct qmt_pool_info    *pool;
778         struct lquota_entry     *lqe;
779         ENTRY;
780
781         /* look-up pool responsible for this global index FID */
782         pool = qmt_pool_lookup_name(env, qmt, pool_type, pool_name);
783         if (IS_ERR(pool))
784                 RETURN(ERR_CAST(pool));
785
786         if (qid->qid_uid == 0) {
787                 /* caller wants to access grace time, no need to look up the
788                  * entry since we keep a reference on ID 0 all the time */
789                 lqe = pool->qpi_grace_lqe[qtype];
790                 lqe_getref(lqe);
791                 GOTO(out, lqe);
792         }
793
794         /* now that we have the pool, let's look-up the quota entry in the
795          * right quota site */
796         lqe = lqe_locate(env, pool->qpi_site[qtype], qid);
797 out:
798         qpi_putref(env, pool);
799         RETURN(lqe);
800 }
801
802 int qmt_pool_lqes_lookup(const struct lu_env *env,
803                          struct qmt_device *qmt,
804                          int rtype, int stype,
805                          int qtype, union lquota_id *qid,
806                          char *pool_name, int idx)
807 {
808         struct qmt_pool_info    *pool;
809         struct lquota_entry     *lqe;
810         int rc, i;
811         ENTRY;
812
813         /* Until MDT pools are not emplemented, all MDTs belong to
814          * global pool, thus lookup lqes only from global pool. */
815         if (rtype == LQUOTA_RES_DT && stype == QMT_STYPE_MDT)
816                 idx = -1;
817
818         qti_pools_init(env);
819         rc = 0;
820         /* look-up pool responsible for this global index FID */
821         pool = qmt_pool_lookup_arr(env, qmt, rtype, idx);
822         if (IS_ERR(pool)) {
823                 qti_pools_fini(env);
824                 RETURN(PTR_ERR(pool));
825         }
826
827         /* now that we have the pool, let's look-up the quota entry in the
828          * right quota site */
829         qti_lqes_init(env);
830         for (i = 0; i < qti_pools_cnt(env); i++) {
831                 pool = qti_pools_env(env)[i];
832                 lqe = lqe_locate(env, pool->qpi_site[qtype], qid);
833                 if (IS_ERR(lqe)) {
834                         qti_lqes_fini(env);
835                         GOTO(out, rc = PTR_ERR(lqe));
836                 }
837                 qti_lqes_add(env, lqe);
838         }
839         LASSERT(qti_lqes_glbl(env)->lqe_is_global);
840
841 out:
842         qti_pools_fini(env);
843         RETURN(rc);
844 }
845
846 static int lqes_cmp(const void *arg1, const void *arg2)
847 {
848         const struct lquota_entry *lqe1, *lqe2;
849
850         lqe1 = *(const struct lquota_entry **)arg1;
851         lqe2 = *(const struct lquota_entry **)arg2;
852         if (lqe1->lqe_qunit > lqe2->lqe_qunit)
853                 return 1;
854         if (lqe1->lqe_qunit < lqe2->lqe_qunit)
855                 return -1;
856         return 0;
857 }
858
859 void qmt_lqes_sort(const struct lu_env *env)
860 {
861         sort(qti_lqes(env), qti_lqes_cnt(env), sizeof(void *), lqes_cmp, NULL);
862         /* global lqe was moved during sorting */
863         if (!qti_lqes_glbl(env)->lqe_is_global) {
864                 int i;
865                 for (i = 0; i < qti_lqes_cnt(env); i++) {
866                         if (qti_lqes(env)[i]->lqe_is_global) {
867                                 qti_glbl_lqe_idx(env) = i;
868                                 break;
869                         }
870                 }
871         }
872 }
873
874 int qmt_pool_lqes_lookup_spec(const struct lu_env *env, struct qmt_device *qmt,
875                               int rtype, int qtype, union lquota_id *qid)
876 {
877         struct qmt_pool_info    *pos;
878         struct lquota_entry     *lqe;
879         int rc = 0;
880
881         qti_lqes_init(env);
882         down_read(&qmt->qmt_pool_lock);
883         if (list_empty(&qmt->qmt_pool_list)) {
884                 up_read(&qmt->qmt_pool_lock);
885                 RETURN(-ENOENT);
886         }
887
888         list_for_each_entry(pos, &qmt->qmt_pool_list, qpi_linkage) {
889                 if (pos->qpi_rtype != rtype)
890                         continue;
891                 /* Don't take into account pools without slaves */
892                 if (!qpi_slv_nr(pos, qtype))
893                         continue;
894                 lqe = lqe_find(env, pos->qpi_site[qtype], qid);
895                 /* ENOENT is valid case for lqe from non global pool
896                  * that hasn't limits, i.e. not enforced. Continue even
897                  * in case of error - we can handle already found lqes */
898                 if (IS_ERR_OR_NULL(lqe)) {
899                         /* let know that something went wrong */
900                         rc = lqe ? PTR_ERR(lqe) : -ENOENT;
901                         continue;
902                 }
903                 if (!lqe->lqe_enforced) {
904                         /* no settings for this qid_uid */
905                         lqe_putref(lqe);
906                         continue;
907                 }
908                 qti_lqes_add(env, lqe);
909                 CDEBUG(D_QUOTA, "adding lqe %p from pool %s\n",
910                                  lqe, pos->qpi_name);
911         }
912         up_read(&qmt->qmt_pool_lock);
913         RETURN(rc);
914 }
915
916 /**
917  * Allocate a new pool for the specified device.
918  *
919  * Allocate a new pool_desc structure for the specified \a new_pool
920  * device to create a pool with the given \a poolname.  The new pool
921  * structure is created with a single reference, and is freed when the
922  * reference count drops to zero.
923  *
924  * \param[in] obd       Lustre OBD device on which to add a pool iterator
925  * \param[in] poolname  the name of the pool to be created
926  *
927  * \retval              0 in case of success
928  * \retval              negative error code in case of error
929  */
930 int qmt_pool_new(struct obd_device *obd, char *poolname)
931 {
932         struct qmt_device       *qmt = lu2qmt_dev(obd->obd_lu_dev);
933         struct qmt_pool_info *qpi;
934         struct lu_env env;
935         int rc;
936         ENTRY;
937
938         if (strnlen(poolname, LOV_MAXPOOLNAME + 1) > LOV_MAXPOOLNAME)
939                 RETURN(-ENAMETOOLONG);
940
941         rc = lu_env_init(&env, LCT_MD_THREAD);
942         if (rc) {
943                 CERROR("%s: can't init env: rc = %d\n", obd->obd_name, rc);
944                 RETURN(rc);
945         }
946
947         qpi = qmt_pool_lookup_name(&env, qmt, LQUOTA_RES_DT, poolname);
948         if (!IS_ERR(qpi)) {
949                 /* Valid case when several MDTs are mounted
950                  * at the same node. */
951                 CDEBUG(D_QUOTA, "pool %s already exists\n", poolname);
952                 qpi_putref(&env, qpi);
953                 GOTO(out_env, rc = -EEXIST);
954         }
955         if (PTR_ERR(qpi) != -ENOENT) {
956                 CWARN("%s: pool %s lookup failed: rc = %ld\n",
957                       obd->obd_name, poolname, PTR_ERR(qpi));
958                 GOTO(out_env, rc = PTR_ERR(qpi));
959         }
960
961         /* Now allocate and prepare only DATA pool.
962          * Further when MDT pools will be ready we need to add
963          * a cycle here and setup pools of both types. Another
964          * approach is to find out pool of which type should be
965          * created. */
966         rc = qmt_pool_alloc(&env, qmt, poolname, LQUOTA_RES_DT);
967         if (rc) {
968                 CERROR("%s: can't alloc pool %s: rc = %d\n",
969                        obd->obd_name, poolname, rc);
970                 GOTO(out_env, rc);
971         }
972
973         rc = qmt_pool_prepare(&env, qmt, qmt->qmt_root, poolname);
974         if (rc) {
975                 CERROR("%s: can't prepare pool for %s: rc = %d\n",
976                        obd->obd_name, poolname, rc);
977                 GOTO(out_err, rc);
978         }
979
980         CDEBUG(D_QUOTA, "Quota pool "LOV_POOLNAMEF" added\n",
981                poolname);
982
983         GOTO(out_env, rc);
984 out_err:
985         qpi = qmt_pool_lookup_name(&env, qmt, LQUOTA_RES_DT, poolname);
986         if (!IS_ERR(qpi)) {
987                 qpi_putref(&env, qpi);
988                 qpi_putref(&env, qpi);
989         }
990 out_env:
991         lu_env_fini(&env);
992         return rc;
993 }
994
995 static int
996 qmt_obj_recalc(const struct lu_env *env, struct dt_object *obj,
997                struct lquota_site *site)
998 {
999         struct qmt_thread_info *qti = qmt_info(env);
1000         union lquota_id *qid = &qti->qti_id;
1001         const struct dt_it_ops *iops;
1002         struct dt_key *key;
1003         struct dt_it *it;
1004         __u64 granted;
1005         int rc;
1006         ENTRY;
1007
1008         iops = &obj->do_index_ops->dio_it;
1009
1010         it = iops->init(env, obj, 0);
1011         if (IS_ERR(it)) {
1012                 CWARN("quota: initialize it for "DFID" failed: rc = %ld\n",
1013                       PFID(&qti->qti_fid), PTR_ERR(it));
1014                 RETURN(PTR_ERR(it));
1015         }
1016
1017         rc = iops->load(env, it, 0);
1018         if (rc < 0) {
1019                 CWARN("quota: load first entry for "DFID" failed: rc = %d\n",
1020                       PFID(&qti->qti_fid), rc);
1021                 GOTO(out, rc);
1022         } else if (rc == 0) {
1023                 rc = iops->next(env, it);
1024                 if (rc != 0)
1025                         GOTO(out, rc = (rc < 0) ? rc : 0);
1026         }
1027
1028         do {
1029                 struct lquota_entry *lqe;
1030
1031                 key = iops->key(env, it);
1032                 if (IS_ERR(key)) {
1033                         CWARN("quota: error key for "DFID": rc = %ld\n",
1034                               PFID(&qti->qti_fid), PTR_ERR(key));
1035                         GOTO(out, rc = PTR_ERR(key));
1036                 }
1037
1038                 /* skip the root user/group */
1039                 if (*((__u64 *)key) == 0)
1040                         goto next;
1041
1042                 qid->qid_uid = *((__u64 *)key);
1043
1044                 rc = qmt_slv_read(env, qid, obj, &granted);
1045                 if (!granted)
1046                         goto next;
1047
1048                 lqe = lqe_locate(env, site, qid);
1049                 if (IS_ERR(lqe))
1050                         GOTO(out, rc = PTR_ERR(lqe));
1051                 lqe_write_lock(lqe);
1052                 lqe->lqe_recalc_granted += granted;
1053                 lqe_write_unlock(lqe);
1054                 lqe_putref(lqe);
1055 next:
1056                 rc = iops->next(env, it);
1057                 if (rc < 0)
1058                         CWARN("quota: failed to parse index "DFID
1059                               ", ->next error: rc = %d\n",
1060                               PFID(&qti->qti_fid), rc);
1061         } while (rc == 0 && !kthread_should_stop());
1062
1063 out:
1064         iops->put(env, it);
1065         iops->fini(env, it);
1066         RETURN(rc);
1067 }
1068
1069 static int qmt_site_recalc_cb(struct cfs_hash *hs, struct cfs_hash_bd *bd,
1070                               struct hlist_node *hnode, void *data)
1071 {
1072         struct lquota_entry     *lqe;
1073         struct lu_env *env = data;
1074
1075         lqe = hlist_entry(hnode, struct lquota_entry, lqe_hash);
1076         LASSERT(atomic_read(&lqe->lqe_ref) > 0);
1077
1078         lqe_write_lock(lqe);
1079         if (lqe->lqe_granted != lqe->lqe_recalc_granted) {
1080                 struct qmt_device *qmt = lqe2qpi(lqe)->qpi_qmt;
1081                 struct thandle *th;
1082                 bool need_notify = false;
1083                 int rc;
1084
1085                 LQUOTA_DEBUG(lqe, "lqe_recalc_granted %llu\n",
1086                              lqe->lqe_recalc_granted);
1087                 lqe->lqe_granted = lqe->lqe_recalc_granted;
1088                 /* Always returns true, if there is no slaves in a pool */
1089                 need_notify |= qmt_adjust_qunit(env, lqe);
1090                 need_notify |= qmt_adjust_edquot(lqe, ktime_get_real_seconds());
1091                 if (need_notify) {
1092                         /* Find all lqes with lqe_id to reseed lgd array */
1093                         rc = qmt_pool_lqes_lookup_spec(env, qmt, lqe_rtype(lqe),
1094                                                 lqe_qtype(lqe), &lqe->lqe_id);
1095                         if (!rc && qti_lqes_glbl(env)->lqe_glbl_data) {
1096                                 qmt_seed_glbe(env,
1097                                         qti_lqes_glbl(env)->lqe_glbl_data);
1098                                 qmt_id_lock_notify(qmt, qti_lqes_glbl(env));
1099                         }
1100                         qti_lqes_fini(env);
1101                 }
1102                 th = dt_trans_create(env, qmt->qmt_child);
1103                 if (IS_ERR(th))
1104                         goto out;
1105
1106                 rc = lquota_disk_declare_write(env, th,
1107                                                LQE_GLB_OBJ(lqe),
1108                                                &lqe->lqe_id);
1109                 if (rc)
1110                         GOTO(out_stop, rc);
1111
1112                 rc = dt_trans_start_local(env, qmt->qmt_child, th);
1113                 if (rc)
1114                         GOTO(out_stop, rc);
1115
1116                 qmt_glb_write(env, th, lqe, 0, NULL);
1117 out_stop:
1118                 dt_trans_stop(env, qmt->qmt_child, th);
1119         }
1120 out:
1121         lqe->lqe_recalc_granted = 0;
1122         lqe_write_unlock(lqe);
1123
1124         return 0;
1125 }
1126
1127 #define MDT_DEV_NAME_LEN (LUSTRE_MAXFSNAME + sizeof("-MDT0000"))
1128 static struct obd_device *qmt_get_mgc(struct qmt_device *qmt)
1129 {
1130         char mdt_name[MDT_DEV_NAME_LEN];
1131         struct lustre_mount_info *lmi;
1132         struct obd_device *obd;
1133         int rc;
1134         ENTRY;
1135
1136         rc = server_name2fsname(qmt->qmt_svname, mdt_name, NULL);
1137         if (rc) {
1138                 CERROR("quota: cannot get server name from %s: rc = %d\n",
1139                        qmt->qmt_svname, rc);
1140                 RETURN(ERR_PTR(rc));
1141         }
1142
1143         strlcat(mdt_name, "-MDT0000", MDT_DEV_NAME_LEN);
1144         lmi = server_get_mount(mdt_name);
1145         if (lmi == NULL) {
1146                 rc = -ENOENT;
1147                 CERROR("%s: cannot get mount info from %s: rc = %d\n",
1148                        qmt->qmt_svname, mdt_name, rc);
1149                 RETURN(ERR_PTR(rc));
1150         }
1151         obd = s2lsi(lmi->lmi_sb)->lsi_mgc;
1152         lustre_put_lsi(lmi->lmi_sb);
1153
1154         RETURN(obd);
1155 }
1156
1157 static int qmt_pool_recalc(void *args)
1158 {
1159         struct qmt_pool_info *pool, *glbl_pool;
1160         struct rw_semaphore *sem = NULL;
1161         struct obd_device *obd;
1162         struct lu_env env;
1163         int i, rc, qtype, slaves_cnt;
1164         ENTRY;
1165
1166         pool = args;
1167
1168         rc = lu_env_init(&env, LCT_MD_THREAD);
1169         if (rc) {
1170                 CERROR("%s: cannot init env: rc = %d\n",
1171                        pool->qpi_qmt->qmt_svname, rc);
1172                 GOTO(out, rc);
1173         }
1174
1175         obd = qmt_get_mgc(pool->qpi_qmt);
1176         if (IS_ERR(obd))
1177                 GOTO(out, rc = PTR_ERR(obd));
1178         else
1179                 /* Waiting for the end of processing mgs config.
1180                  * It is needed to be sure all pools are configured. */
1181                 while (obd->obd_process_conf)
1182                         schedule_timeout_uninterruptible(cfs_time_seconds(1));
1183
1184         OBD_FAIL_TIMEOUT(OBD_FAIL_QUOTA_RECALC, cfs_fail_val);
1185         sem = qmt_sarr_rwsem(pool);
1186         LASSERT(sem);
1187         down_read(sem);
1188         /* Hold this to be sure that OSTs from this pool
1189          * can't do acquire/release.
1190          *
1191          * I guess below write semaphore could be a bottleneck
1192          * as qmt_dqacq would be blocked trying to hold
1193          * read_lock at qmt_pool_lookup->qti_pools_add.
1194          * But on the other hand adding/removing OSTs to the pool is
1195          * a rare operation. If finally this would be a problem,
1196          * we can consider another approach. For example we can
1197          * iterate through the POOL's lqes. Take lqe, hold lqe_write_lock
1198          * and go through appropriate OSTs. I don't use this approach now
1199          * as newly created pool hasn't lqes entries. So firstly we need
1200          * to get this lqes from the global pool index file. This
1201          * solution looks more complex, so leave it as it is. */
1202         down_write(&pool->qpi_recalc_sem);
1203
1204         glbl_pool = qmt_pool_lookup_glb(&env, pool->qpi_qmt, pool->qpi_rtype);
1205         if (IS_ERR(glbl_pool))
1206                 GOTO(out, rc = PTR_ERR(glbl_pool));
1207
1208         slaves_cnt = qmt_sarr_count(pool);
1209         CDEBUG(D_QUOTA, "Starting pool recalculation for %d slaves in %s\n",
1210                slaves_cnt, pool->qpi_name);
1211
1212         for (qtype = 0; qtype < LL_MAXQUOTAS; qtype++) {
1213                 for (i = 0; i < slaves_cnt; i++) {
1214                         struct qmt_thread_info  *qti = qmt_info(&env);
1215                         struct dt_object *slv_obj;
1216                         struct obd_uuid uuid;
1217                         int idx;
1218
1219                         if (kthread_should_stop())
1220                                 GOTO(out_stop, rc = 0);
1221                         idx = qmt_sarr_get_idx(pool, i);
1222                         LASSERT(idx >= 0);
1223
1224                         /* We don't need fsname here - anyway
1225                          * lquota_disk_slv_filename ignores it. */
1226                         snprintf(uuid.uuid, UUID_MAX, "-OST%04x_UUID", idx);
1227                         lquota_generate_fid(&qti->qti_fid, pool->qpi_rtype,
1228                                             qtype);
1229                         /* look-up index file associated with acquiring slave */
1230                         slv_obj = lquota_disk_slv_find(&env,
1231                                                 glbl_pool->qpi_qmt->qmt_child,
1232                                                 glbl_pool->qpi_root,
1233                                                 &qti->qti_fid,
1234                                                 &uuid);
1235                         if (IS_ERR(slv_obj))
1236                                 GOTO(out_stop, rc = PTR_ERR(slv_obj));
1237
1238                         CDEBUG(D_QUOTA, "slv_obj is found %p for uuid %s\n",
1239                                slv_obj, uuid.uuid);
1240                         qmt_obj_recalc(&env, slv_obj, pool->qpi_site[qtype]);
1241                         dt_object_put(&env, slv_obj);
1242                 }
1243                 /* Now go trough the site hash and compare lqe_granted
1244                  * with lqe_calc_granted. Write new value if disagree */
1245
1246                 cfs_hash_for_each(pool->qpi_site[qtype]->lqs_hash,
1247                                   qmt_site_recalc_cb, &env);
1248         }
1249         GOTO(out_stop, rc);
1250 out_stop:
1251         qpi_putref(&env, glbl_pool);
1252 out:
1253         if (xchg(&pool->qpi_recalc_task, NULL) == NULL)
1254                 /*
1255                  * Someone is waiting for us to stop - be sure not to exit
1256                  * before kthread_stop() gets a ref on the task.  No event
1257                  * will happen on 'pool, this is just a convenient way to
1258                  * wait.
1259                  */
1260                 wait_var_event(pool, kthread_should_stop());
1261
1262         clear_bit(QPI_FLAG_RECALC_OFFSET, &pool->qpi_flags);
1263         /* Pool can't be changed, since sem has been down.
1264          * Thus until up_read, no one can restart recalc thread.
1265          */
1266         if (sem) {
1267                 up_read(sem);
1268                 up_write(&pool->qpi_recalc_sem);
1269         }
1270
1271         /* qpi_getref has been called in qmt_start_pool_recalc,
1272          * however we can't call qpi_putref if lu_env_init failed.
1273          */
1274         if (env.le_ctx.lc_state == LCS_ENTERED) {
1275                 qpi_putref(&env, pool);
1276                 lu_env_fini(&env);
1277         }
1278
1279         return rc;
1280 }
1281
1282 static int qmt_start_pool_recalc(struct lu_env *env, struct qmt_pool_info *qpi)
1283 {
1284         struct task_struct *task;
1285         int rc = 0;
1286
1287         if (!test_and_set_bit(QPI_FLAG_RECALC_OFFSET, &qpi->qpi_flags)) {
1288                 LASSERT(!qpi->qpi_recalc_task);
1289
1290                 qpi_getref(qpi);
1291                 task = kthread_create(qmt_pool_recalc, qpi,
1292                                       "qsd_reint_%s", qpi->qpi_name);
1293                 if (IS_ERR(task)) {
1294                         clear_bit(QPI_FLAG_RECALC_OFFSET, &qpi->qpi_flags);
1295                         rc = PTR_ERR(task);
1296                         qpi_putref(env, qpi);
1297                 } else {
1298                         qpi->qpi_recalc_task = task;
1299                         /* Using park/unpark to start the thread ensures that
1300                          * the thread function does get calls, so the
1301                          * ref on qpi will be dropped
1302                          */
1303                         kthread_park(task);
1304                         kthread_unpark(task);
1305                 }
1306         }
1307
1308         RETURN(rc);
1309 }
1310
1311 static inline void qmt_stop_pool_recalc(struct qmt_pool_info *qpi)
1312 {
1313         struct task_struct *task;
1314
1315         task = xchg(&qpi->qpi_recalc_task, NULL);
1316         if (task)
1317                 kthread_stop(task);
1318 }
1319
1320 static int qmt_pool_slv_nr_change(const struct lu_env *env,
1321                                   struct qmt_pool_info *pool,
1322                                   int idx, bool add)
1323 {
1324         struct qmt_pool_info *glbl_pool;
1325         int qtype;
1326
1327         glbl_pool = qmt_pool_lookup_glb(env, pool->qpi_qmt, LQUOTA_RES_DT);
1328         if (IS_ERR(glbl_pool))
1329                 RETURN(PTR_ERR(glbl_pool));
1330
1331         for (qtype = 0; qtype < LL_MAXQUOTAS; qtype++) {
1332                 struct qmt_thread_info  *qti = qmt_info(env);
1333                 struct dt_object *slv_obj;
1334                 struct obd_uuid uuid;
1335
1336                 /* We don't need fsname here - anyway
1337                  * lquota_disk_slv_filename ignores it. */
1338                 snprintf(uuid.uuid, UUID_MAX, "-OST%04x_UUID", idx);
1339                 lquota_generate_fid(&qti->qti_fid, pool->qpi_rtype,
1340                                     qtype);
1341                 /* look-up index file associated with acquiring slave */
1342                 slv_obj = lquota_disk_slv_find(env,
1343                                         glbl_pool->qpi_qmt->qmt_child,
1344                                         glbl_pool->qpi_root,
1345                                         &qti->qti_fid,
1346                                         &uuid);
1347                 if (IS_ERR(slv_obj))
1348                         continue;
1349
1350                 if (add)
1351                         pool->qpi_slv_nr[QMT_STYPE_OST][qtype]++;
1352                 else
1353                         pool->qpi_slv_nr[QMT_STYPE_OST][qtype]--;
1354                 dt_object_put(env, slv_obj);
1355         }
1356         qpi_putref(env, glbl_pool);
1357
1358         return 0;
1359 }
1360
1361 static int qmt_pool_add_rem(struct obd_device *obd, char *poolname,
1362                             char *slavename, bool add)
1363 {
1364         struct qmt_device       *qmt = lu2qmt_dev(obd->obd_lu_dev);
1365         struct qmt_pool_info    *qpi;
1366         struct lu_env            env;
1367         int                      rc, idx;
1368         ENTRY;
1369
1370         if (qmt->qmt_stopping)
1371                 RETURN(0);
1372
1373         if (strnlen(poolname, LOV_MAXPOOLNAME + 1) > LOV_MAXPOOLNAME)
1374                 RETURN(-ENAMETOOLONG);
1375
1376         CDEBUG(D_QUOTA, add ? "%s: pool %s, adding %s\n" :
1377                               "%s: pool %s, removing %s\n",
1378               obd->obd_name, poolname, slavename);
1379
1380         rc = server_name2index(slavename, &idx, NULL);
1381         if (rc != LDD_F_SV_TYPE_OST)
1382                 RETURN(-EINVAL);
1383
1384         rc = lu_env_init(&env, LCT_MD_THREAD);
1385         if (rc) {
1386                 CERROR("%s: cannot init env: rc = %d\n", obd->obd_name, rc);
1387                 RETURN(rc);
1388         }
1389
1390         qpi = qmt_pool_lookup_name(&env, qmt, LQUOTA_RES_DT, poolname);
1391         if (IS_ERR(qpi)) {
1392                 CWARN("%s: can't find pool %s: rc = %long\n",
1393                       obd->obd_name, poolname, PTR_ERR(qpi));
1394                 GOTO(out, rc = PTR_ERR(qpi));
1395         }
1396
1397         rc = add ? qmt_sarr_pool_add(qpi, idx, 32) :
1398                    qmt_sarr_pool_rem(qpi, idx);
1399         if (rc) {
1400                 CERROR("%s: can't %s %s pool %s: rc = %d\n",
1401                        add ? "add to" : "remove", obd->obd_name,
1402                        slavename, poolname, rc);
1403                 GOTO(out_putref, rc);
1404         }
1405         qmt_pool_slv_nr_change(&env, qpi, idx, add);
1406         qmt_start_pool_recalc(&env, qpi);
1407
1408 out_putref:
1409         qpi_putref(&env, qpi);
1410 out:
1411         lu_env_fini(&env);
1412         RETURN(rc);
1413 }
1414
1415
1416
1417 /**
1418  * Add a single target device to the named pool.
1419  *
1420  * \param[in] obd       OBD device on which to add the pool
1421  * \param[in] poolname  name of the pool to which to add the target \a slavename
1422  * \param[in] slavename name of the target device to be added
1423  *
1424  * \retval              0 if \a slavename was (previously) added to the pool
1425  * \retval              negative error number on failure
1426  */
1427 int qmt_pool_add(struct obd_device *obd, char *poolname, char *slavename)
1428 {
1429         return qmt_pool_add_rem(obd, poolname, slavename, true);
1430 }
1431
1432 /**
1433  * Remove the named target from the specified pool.
1434  *
1435  * \param[in] obd       OBD device from which to remove \a poolname
1436  * \param[in] poolname  name of the pool to be changed
1437  * \param[in] slavename name of the target to remove from \a poolname
1438  *
1439  * \retval              0 on successfully removing \a slavename from the pool
1440  * \retval              negative number on error (e.g. \a slavename not in pool)
1441  */
1442 int qmt_pool_rem(struct obd_device *obd, char *poolname, char *slavename)
1443 {
1444         return qmt_pool_add_rem(obd, poolname, slavename, false);
1445 }
1446
1447 /**
1448  * Remove the named pool from the QMT device.
1449  *
1450  * \param[in] obd       OBD device on which pool was previously created
1451  * \param[in] poolname  name of pool to remove from \a obd
1452  *
1453  * \retval              0 on successfully removing the pool
1454  * \retval              negative error numbers for failures
1455  */
1456 int qmt_pool_del(struct obd_device *obd, char *poolname)
1457 {
1458         struct qmt_device       *qmt = lu2qmt_dev(obd->obd_lu_dev);
1459         struct qmt_pool_info    *qpi;
1460         struct lu_fid            fid;
1461         char                     buf[LQUOTA_NAME_MAX];
1462         struct lu_env            env;
1463         int                      rc;
1464         int                      qtype;
1465         ENTRY;
1466
1467         if (strnlen(poolname, LOV_MAXPOOLNAME + 1) > LOV_MAXPOOLNAME)
1468                 RETURN(-ENAMETOOLONG);
1469
1470         CDEBUG(D_QUOTA, "Removing quota pool "LOV_POOLNAMEF"\n",
1471                poolname);
1472
1473         rc = lu_env_init(&env, LCT_MD_THREAD);
1474         if (rc) {
1475                 CERROR("%s: cannot init env: rc = %d\n", obd->obd_name, rc);
1476                 RETURN(rc);
1477         }
1478
1479         /* look-up pool in charge of this global index FID */
1480         qpi = qmt_pool_lookup_name(&env, qmt, LQUOTA_RES_DT, poolname);
1481         if (IS_ERR(qpi)) {
1482                 /* Valid case for several MDTs at the same node -
1483                  * pool removed by the 1st MDT in config */
1484                 CDEBUG(D_QUOTA, "Cannot find pool %s\n", poolname);
1485                 lu_env_fini(&env);
1486                 RETURN(PTR_ERR(qpi));
1487         }
1488
1489         for (qtype = 0; qtype < LL_MAXQUOTAS; qtype++) {
1490                 lquota_generate_fid(&fid, LQUOTA_RES_DT, qtype);
1491                 snprintf(buf, LQUOTA_NAME_MAX, "0x%x", fid.f_oid);
1492                 rc = local_object_unlink(&env, qmt->qmt_child,
1493                                          qpi->qpi_root, buf);
1494                 if (rc)
1495                         CWARN("%s: cannot unlink %s from pool %s: rc = %d\n",
1496                               obd->obd_name, buf, poolname, rc);
1497         }
1498
1499         /* put ref from look-up */
1500         qpi_putref(&env, qpi);
1501         /* put last ref to free qpi */
1502         qpi_putref(&env, qpi);
1503
1504         snprintf(buf, LQUOTA_NAME_MAX, "%s-%s",
1505                  RES_NAME(LQUOTA_RES_DT), poolname);
1506         rc = local_object_unlink(&env, qmt->qmt_child, qmt->qmt_root, buf);
1507         if (rc)
1508                 CWARN("%s: cannot unlink dir %s: rc = %d\n",
1509                       obd->obd_name, poolname, rc);
1510
1511         lu_env_fini(&env);
1512         RETURN(0);
1513 }
1514
1515 static inline int qmt_sarr_pool_init(struct qmt_pool_info *qpi)
1516 {
1517
1518         /* No need to initialize sarray for global pool
1519          * as it always includes all slaves */
1520         if (qmt_pool_global(qpi))
1521                 return 0;
1522
1523         switch (qpi->qpi_rtype) {
1524         case LQUOTA_RES_DT:
1525                 return lu_tgt_pool_init(&qpi->qpi_sarr.osts, 0);
1526         case LQUOTA_RES_MD:
1527         default:
1528                 return 0;
1529         }
1530 }
1531
1532 static inline int qmt_sarr_pool_add(struct qmt_pool_info *qpi, int idx, int min)
1533 {
1534         switch (qpi->qpi_rtype) {
1535         case LQUOTA_RES_DT:
1536                 return lu_tgt_pool_add(&qpi->qpi_sarr.osts, idx, min);
1537         case LQUOTA_RES_MD:
1538         default:
1539                 return 0;
1540         }
1541 }
1542
1543 static inline int qmt_sarr_pool_rem(struct qmt_pool_info *qpi, int idx)
1544 {
1545         switch (qpi->qpi_rtype) {
1546         case LQUOTA_RES_DT:
1547                 return lu_tgt_pool_remove(&qpi->qpi_sarr.osts, idx);
1548         case LQUOTA_RES_MD:
1549         default:
1550                 return 0;
1551         }
1552 }
1553
1554 static inline void qmt_sarr_pool_free(struct qmt_pool_info *qpi)
1555 {
1556         if (qmt_pool_global(qpi))
1557                 return;
1558
1559         switch (qpi->qpi_rtype) {
1560         case LQUOTA_RES_DT:
1561                 if (qpi->qpi_sarr.osts.op_array)
1562                         lu_tgt_pool_free(&qpi->qpi_sarr.osts);
1563                 return;
1564         case LQUOTA_RES_MD:
1565         default:
1566                 return;
1567         }
1568 }
1569
1570 static inline int qmt_sarr_check_idx(struct qmt_pool_info *qpi, int idx)
1571 {
1572         if (qmt_pool_global(qpi))
1573                 return 0;
1574
1575         switch (qpi->qpi_rtype) {
1576         case LQUOTA_RES_DT:
1577                 return lu_tgt_check_index(idx, &qpi->qpi_sarr.osts);
1578         case LQUOTA_RES_MD:
1579         default:
1580                 return 0;
1581         }
1582 }
1583
1584 struct rw_semaphore *qmt_sarr_rwsem(struct qmt_pool_info *qpi)
1585 {
1586         switch (qpi->qpi_rtype) {
1587         case LQUOTA_RES_DT:
1588                 /* to protect ost_pool use */
1589                 return &qpi->qpi_sarr.osts.op_rw_sem;
1590         case LQUOTA_RES_MD:
1591         default:
1592                 return NULL;
1593         }
1594 }
1595
1596 int qmt_sarr_get_idx(struct qmt_pool_info *qpi, int arr_idx)
1597 {
1598
1599         if (qmt_pool_global(qpi))
1600                 return arr_idx;
1601
1602         switch (qpi->qpi_rtype) {
1603         case LQUOTA_RES_DT:
1604                 LASSERTF(arr_idx < qpi->qpi_sarr.osts.op_count && arr_idx >= 0,
1605                          "idx invalid %d op_count %d\n", arr_idx,
1606                          qpi->qpi_sarr.osts.op_count);
1607                 return qpi->qpi_sarr.osts.op_array[arr_idx];
1608         case LQUOTA_RES_MD:
1609         default:
1610                 return -EINVAL;
1611         }
1612 }
1613
1614 /* Number of slaves in a pool */
1615 unsigned int qmt_sarr_count(struct qmt_pool_info *qpi)
1616 {
1617         switch (qpi->qpi_rtype) {
1618         case LQUOTA_RES_DT:
1619                 return qpi->qpi_sarr.osts.op_count;
1620         case LQUOTA_RES_MD:
1621         default:
1622                 return -EINVAL;
1623         }
1624 }