4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; if not, write to the
18 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19 * Boston, MA 021110-1307, USA
24 * Copyright (c) 2012, 2017, Intel Corporation.
25 * Use is subject to license terms.
27 * Author: Johann Lombardi <johann.lombardi@intel.com>
28 * Author: Niu Yawei <yawei.niu@intel.com>
32 * A Quota Master Target has a list(qmt_pool_list) where it stores qmt_pool_info
33 * structures. There is one such structure for each pool managed by the QMT.
35 * Each pool can have different quota types enforced (typically user & group
36 * quota). A pool is in charge of managing lquota_entry structures for each
37 * quota type. This is done by creating one lquota_entry site per quota
38 * type. A site stores entries in a hash table and read quota settings from disk
39 * when a given ID isn't present in the hash.
41 * The pool API exported here is the following:
42 * - qmt_pool_init(): initializes the general QMT structures used to manage
44 * - qmt_pool_fini(): frees the structures allocated by qmt_pool_fini().
45 * - qmt_pool_prepare(): sets up the on-disk indexes associated with each pool.
46 * - qmt_pool_new_conn(): is used to create a new slave index file.
47 * - qmt_pool_lqe_lookup(): returns an up-to-date lquota entry associated with
51 #define DEBUG_SUBSYSTEM S_LQUOTA
53 #include <obd_class.h>
54 #include <lprocfs_status.h>
55 #include "qmt_internal.h"
57 static inline int qmt_sarr_pool_init(struct qmt_pool_info *qpi);
58 #define qmt_sarr_pool_add(qpi, idx, stype) \
59 _qmt_sarr_pool_add(qpi, idx, stype, false)
60 #define qmt_sarr_pool_add_locked(qpi, idx, stype) \
61 _qmt_sarr_pool_add(qpi, idx, stype, true)
62 static inline int _qmt_sarr_pool_add(struct qmt_pool_info *qpi,
63 int idx, int min, bool locked);
64 static inline int qmt_sarr_pool_rem(struct qmt_pool_info *qpi, int idx);
65 static inline void qmt_sarr_pool_free(struct qmt_pool_info *qpi);
66 static inline int qmt_sarr_check_idx(struct qmt_pool_info *qpi, int idx);
67 static inline void qmt_stop_pool_recalc(struct qmt_pool_info *qpi);
70 * Static helper functions not used outside the scope of this file
73 static inline void qpi_putref_locked(struct qmt_pool_info *pool)
75 LASSERT(atomic_read(&pool->qpi_ref) > 1);
76 atomic_dec(&pool->qpi_ref);
79 /* some procfs helpers */
80 static int qpi_state_seq_show(struct seq_file *m, void *data)
82 struct qmt_pool_info *pool = m->private;
85 LASSERT(pool != NULL);
86 if (unlikely(!test_bit(QPI_FLAG_STATE_INITED, &pool->qpi_flags)))
89 seq_printf(m, "pool:\n"
93 " least_qunit: %lu\n",
95 RES_NAME(pool->qpi_rtype),
96 atomic_read(&pool->qpi_ref),
97 pool->qpi_least_qunit);
99 for (type = 0; type < LL_MAXQUOTAS; type++)
100 seq_printf(m, " %s:\n"
101 " quota_servers: %d\n"
102 " quota_entries: %d\n",
104 qpi_slv_nr(pool, type),
105 atomic_read(&pool->qpi_site[type]->lqs_hash->hs_count));
109 LPROC_SEQ_FOPS_RO(qpi_state);
111 static int qpi_soft_least_qunit_seq_show(struct seq_file *m, void *data)
113 struct qmt_pool_info *pool = m->private;
114 LASSERT(pool != NULL);
115 if (unlikely(!test_bit(QPI_FLAG_STATE_INITED, &pool->qpi_flags)))
118 seq_printf(m, "%lu\n", pool->qpi_soft_least_qunit);
123 qpi_soft_least_qunit_seq_write(struct file *file, const char __user *buffer,
124 size_t count, loff_t *off)
126 struct seq_file *m = file->private_data;
127 struct qmt_pool_info *pool = m->private;
128 long long least_qunit, qunit;
131 LASSERT(pool != NULL);
132 if (unlikely(!test_bit(QPI_FLAG_STATE_INITED, &pool->qpi_flags)))
135 /* Not tuneable for inode limit */
136 if (pool->qpi_rtype != LQUOTA_RES_DT)
139 rc = kstrtoll_from_user(buffer, count, 0, &least_qunit);
143 /* Miminal qpi_soft_least_qunit */
144 qunit = pool->qpi_least_qunit << 2;
145 /* The value must be power of miminal qpi_soft_least_qunit, see
146 * how the qunit is adjusted in qmt_adjust_qunit(). */
147 while (qunit > 0 && qunit < least_qunit)
150 qunit = INT_MAX & ~3;
152 pool->qpi_soft_least_qunit = qunit;
155 LPROC_SEQ_FOPS(qpi_soft_least_qunit);
157 static struct lprocfs_vars lprocfs_quota_qpi_vars[] = {
159 .fops = &qpi_state_fops },
160 { .name = "soft_least_qunit",
161 .fops = &qpi_soft_least_qunit_fops },
166 * Allocate a new qmt_pool_info structure and add it to qmt_pool_list.
168 * \param env - is the environment passed by the caller
169 * \param qmt - is the quota master target
170 * \param pool_type - is the resource type of this pool instance, either
171 * LQUOTA_RES_MD or LQUOTA_RES_DT.
173 * \retval - 0 on success, appropriate error on failure
175 static int qmt_pool_alloc(const struct lu_env *env, struct qmt_device *qmt,
176 char *pool_name, int pool_type)
178 struct qmt_thread_info *qti = qmt_info(env);
179 struct qmt_pool_info *pool;
186 INIT_LIST_HEAD(&pool->qpi_linkage);
187 init_rwsem(&pool->qpi_recalc_sem);
189 pool->qpi_rtype = pool_type;
192 /* initialize refcount to 1, hash table will then grab an additional
194 atomic_set(&pool->qpi_ref, 1);
196 /* set up least qunit size to use for this pool */
197 pool->qpi_least_qunit = LQUOTA_LEAST_QUNIT(pool_type);
198 if (pool_type == LQUOTA_RES_DT)
199 pool->qpi_soft_least_qunit = pool->qpi_least_qunit << 2;
201 pool->qpi_soft_least_qunit = pool->qpi_least_qunit;
203 /* grab reference on master target that this pool belongs to */
204 lu_device_get(qmt2lu_dev(qmt));
205 lu_ref_add(&qmt2lu_dev(qmt)->ld_reference, "pool", pool);
208 /* create pool proc directory */
209 snprintf(qti->qti_buf, LQUOTA_NAME_MAX, "%s-%s",
210 RES_NAME(pool_type), pool_name);
211 strncpy(pool->qpi_name, pool_name, QPI_MAXNAME);
212 pool->qpi_proc = lprocfs_register(qti->qti_buf, qmt->qmt_proc,
213 lprocfs_quota_qpi_vars, pool);
214 if (IS_ERR(pool->qpi_proc)) {
215 rc = PTR_ERR(pool->qpi_proc);
216 CERROR("%s: failed to create proc entry for pool %s (%d)\n",
217 qmt->qmt_svname, qti->qti_buf, rc);
218 pool->qpi_proc = NULL;
222 rc = qmt_sarr_pool_init(pool);
226 /* add to qmt pool list */
227 down_write(&qmt->qmt_pool_lock);
228 list_add_tail(&pool->qpi_linkage, &qmt->qmt_pool_list);
229 up_write(&qmt->qmt_pool_lock);
233 /* this frees the pool structure since refcount is equal to 1 */
234 qpi_putref(env, pool);
239 * Delete a qmt_pool_info instance and all structures associated.
241 * \param env - is the environment passed by the caller
242 * \param pool - is the qmt_pool_info structure to free
244 void qmt_pool_free(const struct lu_env *env, struct qmt_pool_info *pool)
246 struct qmt_device *qmt = pool->qpi_qmt;
250 /* remove from list */
251 down_write(&qmt->qmt_pool_lock);
252 list_del_init(&pool->qpi_linkage);
253 up_write(&qmt->qmt_pool_lock);
255 if (atomic_read(&pool->qpi_ref) > 0)
258 qmt_stop_pool_recalc(pool);
259 qmt_sarr_pool_free(pool);
261 /* release proc entry */
262 if (pool->qpi_proc) {
263 lprocfs_remove(&pool->qpi_proc);
264 pool->qpi_proc = NULL;
267 /* release per-quota type site used to manage quota entries as well as
268 * references to global index files */
269 for (qtype = 0; qtype < LL_MAXQUOTAS; qtype++) {
270 /* release lqe storing grace time */
271 if (pool->qpi_grace_lqe[qtype] != NULL)
272 lqe_putref(pool->qpi_grace_lqe[qtype]);
275 if (pool->qpi_site[qtype] != NULL &&
276 !IS_ERR(pool->qpi_site[qtype]))
277 lquota_site_free(env, pool->qpi_site[qtype]);
278 /* release reference to global index */
279 if (pool->qpi_glb_obj[qtype] != NULL &&
280 !IS_ERR(pool->qpi_glb_obj[qtype]))
281 dt_object_put(env, pool->qpi_glb_obj[qtype]);
284 /* release reference on pool directory */
285 if (pool->qpi_root != NULL && !IS_ERR(pool->qpi_root))
286 dt_object_put(env, pool->qpi_root);
288 /* release reference on the master target */
289 if (pool->qpi_qmt != NULL) {
290 struct lu_device *ld = qmt2lu_dev(pool->qpi_qmt);
292 lu_ref_del(&ld->ld_reference, "pool", pool);
294 pool->qpi_qmt = NULL;
297 LASSERT(list_empty(&pool->qpi_linkage));
301 static inline void qti_pools_init(const struct lu_env *env)
303 struct qmt_thread_info *qti = qmt_info(env);
305 qti->qti_pools_cnt = 0;
306 qti->qti_pools_num = QMT_MAX_POOL_NUM;
309 #define qti_pools(qti) (qti->qti_pools_num > QMT_MAX_POOL_NUM ? \
310 qti->qti_pools : qti->qti_pools_small)
311 #define qti_pools_env(env) \
312 (qmt_info(env)->qti_pools_num > QMT_MAX_POOL_NUM ? \
313 qmt_info(env)->qti_pools : qmt_info(env)->qti_pools_small)
314 #define qti_pools_cnt(env) (qmt_info(env)->qti_pools_cnt)
316 static inline int qti_pools_add(const struct lu_env *env,
317 struct qmt_pool_info *qpi)
319 struct qmt_thread_info *qti = qmt_info(env);
320 struct qmt_pool_info **pools = qti->qti_pools;
322 pools = qti_pools(qti);
323 LASSERTF(qti->qti_pools_num >= QMT_MAX_POOL_NUM,
324 "Forgot init? %p\n", qti);
326 if (qti->qti_pools_cnt >= qti->qti_pools_num) {
327 OBD_ALLOC(pools, sizeof(qpi) * qti->qti_pools_num * 2);
330 memcpy(pools, qti_pools(qti), qti->qti_pools_cnt * sizeof(qpi));
331 /* Don't need to free, if it is the very 1st allocation */
332 if (qti->qti_pools_num > QMT_MAX_POOL_NUM)
333 OBD_FREE(qti->qti_pools,
334 qti->qti_pools_num * sizeof(qpi));
335 qti->qti_pools = pools;
336 qti->qti_pools_num *= 2;
340 /* Take this to protect pool's lqes against changing by
341 * recalculation thread. This would be unlocked at
343 down_read(&qpi->qpi_recalc_sem);
344 if (qmt_pool_global(qpi) && qti_pools_cnt(env) > 0) {
345 pools[qti->qti_pools_cnt++] = pools[0];
346 /* Store global pool always at index 0 */
349 pools[qti->qti_pools_cnt++] = qpi;
352 CDEBUG(D_QUOTA, "Pool %s is added, pools %p qti_pools %p pool_num %d\n",
353 qpi->qpi_name, pools, qti->qti_pools, qti->qti_pools_cnt);
358 static inline void qti_pools_fini(const struct lu_env *env)
360 struct qmt_thread_info *qti = qmt_info(env);
361 struct qmt_pool_info **pools = qti->qti_pools;
364 LASSERT(qti->qti_pools_cnt > 0);
366 pools = qti_pools(qti);
367 for (i = 0; i < qti->qti_pools_cnt; i++) {
368 up_read(&pools[i]->qpi_recalc_sem);
369 qpi_putref(env, pools[i]);
372 if (qti->qti_pools_num > QMT_MAX_POOL_NUM)
373 OBD_FREE(qti->qti_pools,
374 qti->qti_pools_num * sizeof(struct qmt_pool_info *));
378 * Look-up a pool in a list based on the type.
380 * \param env - is the environment passed by the caller
381 * \param qmt - is the quota master target
382 * \param rtype - is the type of this pool, either LQUOTA_RES_MD or
384 * \param pool_name - is the pool name to search for
385 * \param idx - OST or MDT index to search for. When it is >= 0, function
386 * returns array with pointers to all pools that include
387 * targets with requested index.
388 * \param add - add to qti_pool_arr if true
390 struct qmt_pool_info *qmt_pool_lookup(const struct lu_env *env,
391 struct qmt_device *qmt,
396 struct qmt_pool_info *pos, *pool;
400 down_read(&qmt->qmt_pool_lock);
401 if (list_empty(&qmt->qmt_pool_list)) {
402 up_read(&qmt->qmt_pool_lock);
403 RETURN(ERR_PTR(-ENOENT));
406 CDEBUG(D_QUOTA, "type %d name %s index %d\n",
407 rtype, pool_name ?: "<none>", idx);
408 /* Now just find a pool with correct type in a list. Further we need
409 * to go through the list and find a pool that includes requested OST
410 * or MDT. Possibly this would return a list of pools that includes
411 * needed target(OST/MDT). */
413 if (idx == -1 && !pool_name)
414 pool_name = GLB_POOL_NAME;
416 list_for_each_entry(pos, &qmt->qmt_pool_list, qpi_linkage) {
417 if (pos->qpi_rtype != rtype)
420 if (idx >= 0 && !qmt_sarr_check_idx(pos, idx)) {
421 rc = qti_pools_add(env, pos);
427 if (pool_name && !strncmp(pool_name, pos->qpi_name,
431 rc = qti_pools_add(env, pos);
440 up_read(&qmt->qmt_pool_lock);
445 if (idx >= 0 && qti_pools_cnt(env))
446 pool = qti_pools_env(env)[0];
448 RETURN(pool ? : ERR_PTR(-ENOENT));
450 CERROR("%s: cannot add pool %s: err = %d\n",
451 qmt->qmt_svname, pos->qpi_name, rc);
456 * Functions implementing the pool API, used by the qmt handlers
460 * Destroy all pools which are still in the pool list.
462 * \param env - is the environment passed by the caller
463 * \param qmt - is the quota master target
466 void qmt_pool_fini(const struct lu_env *env, struct qmt_device *qmt)
468 struct qmt_pool_info *pool, *tmp;
471 /* parse list of pool and destroy each element */
472 list_for_each_entry_safe(pool, tmp, &qmt->qmt_pool_list, qpi_linkage) {
473 /* stop all recalc threads - it may hold qpi reference */
474 qmt_stop_pool_recalc(pool);
475 /* release extra reference taken in qmt_pool_alloc */
476 qpi_putref(env, pool);
478 LASSERT(list_empty(&qmt->qmt_pool_list));
484 * Initialize pool configure for the quota master target. For now, we only
485 * support the default data (i.e. all OSTs) and metadata (i.e. all the MDTs)
486 * pool which are instantiated in this function.
488 * \param env - is the environment passed by the caller
489 * \param qmt - is the quota master target for which we have to initialize the
492 * \retval - 0 on success, appropriate error on failure
494 int qmt_pool_init(const struct lu_env *env, struct qmt_device *qmt)
499 INIT_LIST_HEAD(&qmt->qmt_pool_list);
500 init_rwsem(&qmt->qmt_pool_lock);
502 /* Instantiate pool master for the default data and metadata pool.
503 * This code will have to be revisited once we support quota on
504 * non-default pools */
505 for (res = LQUOTA_FIRST_RES; res < LQUOTA_LAST_RES; res++) {
506 rc = qmt_pool_alloc(env, qmt, GLB_POOL_NAME, res);
512 qmt_pool_fini(env, qmt);
517 static int qmt_slv_add(const struct lu_env *env, struct lu_fid *glb_fid,
518 char *slv_name, struct lu_fid *slv_fid, void *arg)
520 struct obd_uuid uuid;
521 struct qmt_pool_info *qpi = arg;
522 int stype, qtype, idx;
525 rc = lquota_extract_fid(glb_fid, NULL, &qtype);
528 obd_str2uuid(&uuid, slv_name);
529 stype = qmt_uuid2idx(&uuid, &idx);
533 CDEBUG(D_QUOTA, "add new idx:%d in %s\n", idx, qpi->qpi_name);
534 rc = qmt_sarr_pool_add(qpi, idx, stype);
535 if (rc && rc != -EEXIST) {
536 CERROR("%s: can't add idx %d into dt-0x0: rc = %d\n",
537 qpi->qpi_qmt->qmt_svname, idx, rc);
542 qpi->qpi_slv_nr[stype][qtype]++;
543 CDEBUG(D_QUOTA, "slv_name %s stype %d qtype %d nr %d\n",
544 slv_name, stype, qtype, qpi->qpi_slv_nr[stype][qtype]);
550 * Set up on-disk index files associated with each pool.
552 * \param env - is the environment passed by the caller
553 * \param qmt - is the quota master target for which we have to initialize the
555 * \param qmt_root - is the on-disk directory created for the QMT.
556 * \param name - is the pool name that we need to setup. Setup all pools
557 * in qmt_pool_list when name is NULL.
559 * \retval - 0 on success, appropriate error on failure
561 int qmt_pool_prepare(const struct lu_env *env, struct qmt_device *qmt,
562 struct dt_object *qmt_root, char *name)
564 struct qmt_thread_info *qti = qmt_info(env);
565 struct lquota_glb_rec *rec = &qti->qti_glb_rec;
566 struct qmt_pool_info *pool;
567 struct dt_device *dev = NULL;
568 dt_obj_version_t version;
569 struct list_head *pos;
570 int rc = 0, i, qtype;
573 /* iterate over each pool in the list and allocate a quota site for each
574 * one. This involves creating a global index file on disk */
575 list_for_each(pos, &qmt->qmt_pool_list) {
576 struct dt_object *obj;
577 struct lquota_entry *lqe;
581 pool = list_entry(pos, struct qmt_pool_info,
584 pool_name = pool->qpi_name;
585 if (name && strncmp(pool_name, name, LOV_MAXPOOLNAME))
587 rtype = pool->qpi_rtype;
589 dev = pool->qpi_qmt->qmt_child;
591 /* allocate directory for this pool */
592 snprintf(qti->qti_buf, LQUOTA_NAME_MAX, "%s-%s",
593 RES_NAME(rtype), pool_name);
594 obj = lquota_disk_dir_find_create(env, qmt->qmt_child, qmt_root,
597 RETURN(PTR_ERR(obj));
598 pool->qpi_root = obj;
600 for (qtype = 0; qtype < LL_MAXQUOTAS; qtype++) {
601 /* Generating FID of global index in charge of storing
602 * settings for this quota type */
603 lquota_generate_fid(&qti->qti_fid, rtype, qtype);
605 /* open/create the global index file for this quota
606 * type. If name is set, it means we came here from
607 * qmt_pool_new and can create glb index with a
608 * local generated FID. */
609 obj = lquota_disk_glb_find_create(env, dev,
612 name ? true : false);
615 CERROR("%s: failed to create glb index copy for %s type: rc = %d\n",
616 qmt->qmt_svname, qtype_name(qtype), rc);
620 pool->qpi_glb_obj[qtype] = obj;
622 version = dt_version_get(env, obj);
623 /* set default grace time for newly created index */
625 rec->qbr_hardlimit = 0;
626 rec->qbr_softlimit = 0;
627 rec->qbr_granted = 0;
628 rec->qbr_time = rtype == LQUOTA_RES_MD ?
629 MAX_IQ_TIME : MAX_DQ_TIME;
631 rc = lquota_disk_write_glb(env, obj, 0, rec);
633 CERROR("%s: failed to set default grace time for %s type: rc = %d\n",
634 qmt->qmt_svname, qtype_name(qtype), rc);
638 rc = lquota_disk_update_ver(env, dev, obj, 1);
640 CERROR("%s: failed to set initial version for %s type: rc = %d\n",
641 qmt->qmt_svname, qtype_name(qtype), rc);
646 /* create quota entry site for this quota type */
647 pool->qpi_site[qtype] = lquota_site_alloc(env, pool,
650 if (IS_ERR(pool->qpi_site[qtype])) {
651 rc = PTR_ERR(pool->qpi_site[qtype]);
652 CERROR("%s: failed to create site for %s type: rc = %d\n",
653 qmt->qmt_svname, qtype_name(qtype), rc);
657 /* count number of slaves which already connected to
658 * the master in the past */
659 for (i = 0; i < QMT_STYPE_CNT; i++)
660 pool->qpi_slv_nr[i][qtype] = 0;
662 rc = lquota_disk_for_each_slv(env, pool->qpi_root,
667 CERROR("%s: failed to scan & count slave indexes for %s type: rc = %d\n",
668 qmt->qmt_svname, qtype_name(qtype), rc);
672 /* Global grace time is stored in quota settings of
674 qti->qti_id.qid_uid = 0;
676 /* look-up quota entry storing grace time */
677 lqe = lqe_locate(env, pool->qpi_site[qtype],
680 RETURN(PTR_ERR(lqe));
681 pool->qpi_grace_lqe[qtype] = lqe;
682 #ifdef CONFIG_PROC_FS
683 /* add procfs file to dump the global index, mostly for
684 * debugging purpose */
685 snprintf(qti->qti_buf, MTI_NAME_MAXLEN,
686 "glb-%s", qtype_name(qtype));
687 rc = lprocfs_seq_create(pool->qpi_proc, qti->qti_buf,
688 0444, &lprocfs_quota_seq_fops,
691 CWARN("%s: Error adding procfs file for global quota index "DFID": rc = %d\n",
692 qmt->qmt_svname, PFID(&qti->qti_fid), rc);
695 set_bit(QPI_FLAG_STATE_INITED, &pool->qpi_flags);
703 static int qmt_lgd_extend_cb(struct cfs_hash *hs, struct cfs_hash_bd *bd,
704 struct hlist_node *hnode, void *data)
706 struct lqe_glbl_entry *lqeg_arr, *old_lqeg_arr;
707 struct lquota_entry *lqe;
710 lqe = hlist_entry(hnode, struct lquota_entry, lqe_hash);
711 LASSERT(atomic_read(&lqe->lqe_ref) > 0);
714 CDEBUG(D_QUOTA, "lgd %px\n", lqe->lqe_glbl_data);
715 if (lqe->lqe_glbl_data) {
716 struct lqe_glbl_data *lgd;
719 mutex_lock(&lqe->lqe_glbl_data_lock);
720 if (lqe->lqe_glbl_data) {
721 struct qmt_pool_info *qpi = lqe2qpi(lqe);
722 int sarr_cnt = qmt_sarr_count(qpi);
724 lgd = lqe->lqe_glbl_data;
725 if (lgd->lqeg_num_alloc < sarr_cnt) {
726 LASSERT((lgd->lqeg_num_alloc + 1) == sarr_cnt);
729 sizeof(struct lqe_glbl_entry) *
730 (lgd->lqeg_num_alloc + 16));
732 memcpy(lqeg_arr, lgd->lqeg_arr,
733 sizeof(struct lqe_glbl_entry) *
734 (lgd->lqeg_num_alloc));
735 old_lqeg_arr = lgd->lqeg_arr;
736 old_num = lgd->lqeg_num_alloc;
737 lgd->lqeg_arr = lqeg_arr;
738 lgd->lqeg_num_alloc += 16;
740 "extend lqeg_arr:%px from %d to %d\n",
742 lgd->lqeg_num_alloc);
744 CERROR("%s: cannot allocate new lqeg_arr: rc = %d\n",
745 qpi->qpi_qmt->qmt_svname,
747 GOTO(out, rc = -ENOMEM);
750 lgd->lqeg_arr[lgd->lqeg_num_used].lge_idx =
751 qmt_sarr_get_idx(qpi, sarr_cnt - 1);
752 lgd->lqeg_arr[lgd->lqeg_num_used].lge_edquot =
754 lgd->lqeg_arr[lgd->lqeg_num_used].lge_qunit =
756 lgd->lqeg_arr[lgd->lqeg_num_used].lge_edquot_nu = 0;
757 lgd->lqeg_arr[lgd->lqeg_num_used].lge_qunit_nu = 0;
758 LQUOTA_DEBUG(lqe, "add tgt idx:%d used %d alloc %d\n",
759 lgd->lqeg_arr[lgd->lqeg_num_used].lge_idx,
760 lgd->lqeg_num_used, lgd->lqeg_num_alloc);
761 lgd->lqeg_num_used++;
764 mutex_unlock(&lqe->lqe_glbl_data_lock);
765 OBD_FREE(old_lqeg_arr, old_num * sizeof(struct lqe_glbl_entry));
772 * Handle new slave connection. Called when a slave enqueues the global quota
773 * lock at the beginning of the reintegration procedure.
775 * \param env - is the environment passed by the caller
776 * \parap qmt - is the quota master target handling this request
777 * \param glb_fid - is the fid of the global index file
778 * \param slv_fid - is the fid of the newly created slave index file
779 * \param slv_ver - is the current version of the slave index file
780 * \param uuid - is the uuid of slave which is (re)connecting to the master
783 * \retval - 0 on success, appropriate error on failure
785 int qmt_pool_new_conn(const struct lu_env *env, struct qmt_device *qmt,
786 struct lu_fid *glb_fid, struct lu_fid *slv_fid,
787 __u64 *slv_ver, struct obd_uuid *uuid)
789 struct qmt_pool_info *pool;
790 struct dt_object *slv_obj;
791 int pool_type, qtype, stype;
792 bool created = false;
795 stype = qmt_uuid2idx(uuid, &idx);
798 CDEBUG(D_QUOTA, "FID "DFID"\n", PFID(glb_fid));
800 /* extract pool info from global index FID */
801 rc = lquota_extract_fid(glb_fid, &pool_type, &qtype);
805 pool = qmt_pool_lookup_glb(env, qmt, pool_type);
807 RETURN(PTR_ERR(pool));
809 /* look-up slave index file */
810 slv_obj = lquota_disk_slv_find(env, qmt->qmt_child, pool->qpi_root,
812 if (IS_ERR(slv_obj) && PTR_ERR(slv_obj) == -ENOENT) {
813 /* create slave index file */
814 slv_obj = lquota_disk_slv_find_create(env, qmt->qmt_child,
815 pool->qpi_root, glb_fid,
819 if (IS_ERR(slv_obj)) {
820 rc = PTR_ERR(slv_obj);
821 CERROR("%s: failed to create quota slave index file for %s (%d)"
822 "\n", qmt->qmt_svname, obd_uuid2str(uuid), rc);
826 /* retrieve slave fid & current object version */
827 memcpy(slv_fid, lu_object_fid(&slv_obj->do_lu), sizeof(*slv_fid));
828 *slv_ver = dt_version_get(env, slv_obj);
829 dt_object_put(env, slv_obj);
831 struct qmt_pool_info *ptr;
833 CDEBUG(D_QUOTA, "add tgt idx:%d pool_type:%d qtype:%d stype:%d\n",
834 idx, pool_type, qtype, stype);
836 if (!qmt_dom(qtype, stype)) {
837 qmt_sarr_write_down(pool);
838 rc = qmt_sarr_pool_add_locked(pool, idx, stype);
840 for (i = 0; i < LL_MAXQUOTAS; i++)
841 cfs_hash_for_each(pool->qpi_site[i]->
845 } else if (rc == -EEXIST) {
846 /* This target has been already added
851 qmt_sarr_write_up(pool);
854 CERROR("%s: cannot add idx:%d to pool %s: rc = %d\n",
855 qmt->qmt_svname, idx,
861 /* look-up pool in charge of this global index FID */
863 ptr = qmt_pool_lookup_arr(env, qmt, pool_type, idx, stype);
865 GOTO(out, rc = PTR_ERR(ptr));
867 for (i = 0; i < qti_pools_cnt(env); i++)
868 qti_pools_env(env)[i]->qpi_slv_nr[stype][qtype]++;
874 qpi_putref(env, pool);
879 * Look-up a lquota_entry in the pool hash and allocate it if not found.
881 * \param env - is the environment passed by the caller
882 * \param qmt - is the quota master target for which we have to initialize the
884 * \param pool_type - is the pool type, either LQUOTA_RES_MD or LQUOTA_RES_DT.
885 * \param qtype - is the quota type, either user or group.
886 * \param qid - is the quota ID to look-up
888 * \retval - valid pointer to lquota entry on success, appropriate error on
891 struct lquota_entry *qmt_pool_lqe_lookup(const struct lu_env *env,
892 struct qmt_device *qmt,
893 int pool_type, int qtype,
894 union lquota_id *qid,
897 struct qmt_pool_info *pool;
898 struct lquota_entry *lqe;
901 /* look-up pool responsible for this global index FID */
902 pool = qmt_pool_lookup_name(env, qmt, pool_type, pool_name);
904 RETURN(ERR_CAST(pool));
906 if (qid->qid_uid == 0) {
907 /* caller wants to access grace time, no need to look up the
908 * entry since we keep a reference on ID 0 all the time */
909 lqe = pool->qpi_grace_lqe[qtype];
914 /* now that we have the pool, let's look-up the quota entry in the
915 * right quota site */
916 lqe = lqe_locate(env, pool->qpi_site[qtype], qid);
918 qpi_putref(env, pool);
922 int qmt_pool_lqes_lookup(const struct lu_env *env,
923 struct qmt_device *qmt,
924 int rtype, int stype,
925 int qtype, union lquota_id *qid,
926 char *pool_name, int idx)
928 struct qmt_pool_info *pool;
929 struct lquota_entry *lqe;
935 /* look-up pool responsible for this global index FID */
936 pool = qmt_pool_lookup_arr(env, qmt, rtype, idx, stype);
939 RETURN(PTR_ERR(pool));
942 /* now that we have the pool, let's look-up the quota entry in the
943 * right quota site */
945 for (i = 0; i < qti_pools_cnt(env); i++) {
946 pool = qti_pools_env(env)[i];
947 lqe = lqe_locate(env, pool->qpi_site[qtype], qid);
950 GOTO(out, rc = PTR_ERR(lqe));
952 qti_lqes_add(env, lqe);
954 LASSERT(qti_lqes_glbl(env)->lqe_is_global);
961 static int lqes_cmp(const void *arg1, const void *arg2)
963 const struct lquota_entry *lqe1, *lqe2;
965 lqe1 = *(const struct lquota_entry **)arg1;
966 lqe2 = *(const struct lquota_entry **)arg2;
967 if (lqe1->lqe_qunit > lqe2->lqe_qunit)
969 if (lqe1->lqe_qunit < lqe2->lqe_qunit)
974 void qmt_lqes_sort(const struct lu_env *env)
976 sort(qti_lqes(env), qti_lqes_cnt(env), sizeof(void *), lqes_cmp, NULL);
977 /* global lqe was moved during sorting */
978 if (!qti_lqes_glbl(env)->lqe_is_global) {
980 for (i = 0; i < qti_lqes_cnt(env); i++) {
981 if (qti_lqes(env)[i]->lqe_is_global) {
982 qti_glbl_lqe_idx(env) = i;
989 int qmt_pool_lqes_lookup_spec(const struct lu_env *env, struct qmt_device *qmt,
990 int rtype, int qtype, union lquota_id *qid)
992 struct qmt_pool_info *pos;
993 struct lquota_entry *lqe;
996 down_read(&qmt->qmt_pool_lock);
997 if (list_empty(&qmt->qmt_pool_list)) {
998 up_read(&qmt->qmt_pool_lock);
1002 list_for_each_entry(pos, &qmt->qmt_pool_list, qpi_linkage) {
1003 if (pos->qpi_rtype != rtype)
1005 /* Don't take into account pools without slaves */
1006 if (!qpi_slv_nr(pos, qtype))
1008 lqe = lqe_find(env, pos->qpi_site[qtype], qid);
1009 /* ENOENT is valid case for lqe from non global pool
1010 * that hasn't limits, i.e. not enforced. Continue even
1011 * in case of error - we can handle already found lqes */
1014 if (!lqe->lqe_enforced) {
1015 /* no settings for this qid_uid */
1019 qti_lqes_add(env, lqe);
1020 CDEBUG(D_QUOTA, "adding lqe %px from pool %s\n",
1021 lqe, pos->qpi_name);
1023 up_read(&qmt->qmt_pool_lock);
1024 RETURN(qti_lqes_cnt(env) ? 0 : -ENOENT);
1028 * Allocate a new pool for the specified device.
1030 * Allocate a new pool_desc structure for the specified \a new_pool
1031 * device to create a pool with the given \a poolname. The new pool
1032 * structure is created with a single reference, and is freed when the
1033 * reference count drops to zero.
1035 * \param[in] obd Lustre OBD device on which to add a pool iterator
1036 * \param[in] poolname the name of the pool to be created
1038 * \retval 0 in case of success
1039 * \retval negative error code in case of error
1041 int qmt_pool_new(struct obd_device *obd, char *poolname)
1043 struct qmt_device *qmt = lu2qmt_dev(obd->obd_lu_dev);
1044 struct qmt_pool_info *qpi;
1049 if (strnlen(poolname, LOV_MAXPOOLNAME + 1) > LOV_MAXPOOLNAME)
1050 RETURN(-ENAMETOOLONG);
1052 rc = lu_env_init(&env, LCT_MD_THREAD);
1054 CERROR("%s: can't init env: rc = %d\n", obd->obd_name, rc);
1058 qpi = qmt_pool_lookup_name(&env, qmt, LQUOTA_RES_DT, poolname);
1060 /* Valid case when several MDTs are mounted
1061 * at the same node. */
1062 CDEBUG(D_QUOTA, "pool %s already exists\n", poolname);
1063 qpi_putref(&env, qpi);
1064 GOTO(out_env, rc = -EEXIST);
1066 if (PTR_ERR(qpi) != -ENOENT) {
1067 CWARN("%s: pool %s lookup failed: rc = %ld\n",
1068 obd->obd_name, poolname, PTR_ERR(qpi));
1069 GOTO(out_env, rc = PTR_ERR(qpi));
1072 /* Now allocate and prepare only DATA pool.
1073 * Further when MDT pools will be ready we need to add
1074 * a cycle here and setup pools of both types. Another
1075 * approach is to find out pool of which type should be
1077 rc = qmt_pool_alloc(&env, qmt, poolname, LQUOTA_RES_DT);
1079 CERROR("%s: can't alloc pool %s: rc = %d\n",
1080 obd->obd_name, poolname, rc);
1084 rc = qmt_pool_prepare(&env, qmt, qmt->qmt_root, poolname);
1086 CERROR("%s: can't prepare pool for %s: rc = %d\n",
1087 obd->obd_name, poolname, rc);
1091 CDEBUG(D_QUOTA, "Quota pool "LOV_POOLNAMEF" added\n",
1096 qpi = qmt_pool_lookup_name(&env, qmt, LQUOTA_RES_DT, poolname);
1098 qpi_putref(&env, qpi);
1099 qpi_putref(&env, qpi);
1107 qmt_obj_recalc(const struct lu_env *env, struct dt_object *obj,
1108 struct lquota_site *site)
1110 struct qmt_thread_info *qti = qmt_info(env);
1111 union lquota_id *qid = &qti->qti_id;
1112 const struct dt_it_ops *iops;
1119 iops = &obj->do_index_ops->dio_it;
1121 it = iops->init(env, obj, 0);
1123 CWARN("quota: initialize it for "DFID" failed: rc = %ld\n",
1124 PFID(&qti->qti_fid), PTR_ERR(it));
1125 RETURN(PTR_ERR(it));
1128 rc = iops->load(env, it, 0);
1130 CWARN("quota: load first entry for "DFID" failed: rc = %d\n",
1131 PFID(&qti->qti_fid), rc);
1133 } else if (rc == 0) {
1134 rc = iops->next(env, it);
1136 GOTO(out, rc = (rc < 0) ? rc : 0);
1140 struct lquota_entry *lqe;
1142 key = iops->key(env, it);
1144 CWARN("quota: error key for "DFID": rc = %ld\n",
1145 PFID(&qti->qti_fid), PTR_ERR(key));
1146 GOTO(out, rc = PTR_ERR(key));
1149 /* skip the root user/group */
1150 if (*((__u64 *)key) == 0)
1153 qid->qid_uid = *((__u64 *)key);
1155 rc = qmt_slv_read(env, qid, obj, &granted);
1159 lqe = lqe_locate(env, site, qid);
1161 GOTO(out, rc = PTR_ERR(lqe));
1162 lqe_write_lock(lqe);
1163 lqe->lqe_recalc_granted += granted;
1164 lqe_write_unlock(lqe);
1167 rc = iops->next(env, it);
1169 CWARN("quota: failed to parse index "DFID
1170 ", ->next error: rc = %d\n",
1171 PFID(&qti->qti_fid), rc);
1172 } while (rc == 0 && !kthread_should_stop());
1176 iops->fini(env, it);
1180 static int qmt_site_recalc_cb(struct cfs_hash *hs, struct cfs_hash_bd *bd,
1181 struct hlist_node *hnode, void *data)
1183 struct lquota_entry *lqe;
1184 struct lu_env *env = data;
1186 lqe = hlist_entry(hnode, struct lquota_entry, lqe_hash);
1187 LASSERT(atomic_read(&lqe->lqe_ref) > 0);
1189 lqe_write_lock(lqe);
1190 if (lqe->lqe_granted != lqe->lqe_recalc_granted) {
1191 struct qmt_device *qmt = lqe2qpi(lqe)->qpi_qmt;
1193 bool need_notify = false;
1196 LQUOTA_DEBUG(lqe, "lqe_recalc_granted %llu\n",
1197 lqe->lqe_recalc_granted);
1198 lqe->lqe_granted = lqe->lqe_recalc_granted;
1199 /* Always returns true, if there is no slaves in a pool */
1200 need_notify |= qmt_adjust_qunit(env, lqe);
1201 need_notify |= qmt_adjust_edquot(lqe, ktime_get_real_seconds());
1203 /* Find all lqes with lqe_id to reseed lgd array */
1204 rc = qmt_pool_lqes_lookup_spec(env, qmt, lqe_rtype(lqe),
1205 lqe_qtype(lqe), &lqe->lqe_id);
1207 struct lquota_entry *lqeg = qti_lqes_glbl(env);
1209 mutex_lock(&lqeg->lqe_glbl_data_lock);
1210 if (lqeg->lqe_glbl_data)
1211 qmt_seed_glbe(env, lqeg->lqe_glbl_data);
1212 mutex_unlock(&lqeg->lqe_glbl_data_lock);
1213 qmt_id_lock_notify(qmt, lqeg);
1217 th = dt_trans_create(env, qmt->qmt_child);
1221 rc = lquota_disk_declare_write(env, th,
1227 rc = dt_trans_start_local(env, qmt->qmt_child, th);
1231 qmt_glb_write(env, th, lqe, 0, NULL);
1233 dt_trans_stop(env, qmt->qmt_child, th);
1236 lqe->lqe_recalc_granted = 0;
1237 lqe_write_unlock(lqe);
1242 #define MDT_DEV_NAME_LEN (LUSTRE_MAXFSNAME + sizeof("-MDT0000"))
1243 static struct obd_device *qmt_get_mgc(struct qmt_device *qmt)
1245 char mdt_name[MDT_DEV_NAME_LEN];
1246 struct lustre_mount_info *lmi;
1247 struct obd_device *obd;
1251 rc = server_name2fsname(qmt->qmt_svname, mdt_name, NULL);
1253 CERROR("quota: cannot get server name from %s: rc = %d\n",
1254 qmt->qmt_svname, rc);
1255 RETURN(ERR_PTR(rc));
1258 strlcat(mdt_name, "-MDT0000", MDT_DEV_NAME_LEN);
1259 lmi = server_get_mount(mdt_name);
1262 CERROR("%s: cannot get mount info from %s: rc = %d\n",
1263 qmt->qmt_svname, mdt_name, rc);
1264 RETURN(ERR_PTR(rc));
1266 obd = s2lsi(lmi->lmi_sb)->lsi_mgc;
1267 lustre_put_lsi(lmi->lmi_sb);
1272 static int qmt_pool_recalc(void *args)
1274 struct qmt_pool_info *pool, *glbl_pool;
1275 struct obd_device *obd;
1277 int i, rc, qtype, slaves_cnt;
1283 rc = lu_env_init(&env, LCT_MD_THREAD);
1285 CERROR("%s: cannot init env: rc = %d\n",
1286 pool->qpi_qmt->qmt_svname, rc);
1290 obd = qmt_get_mgc(pool->qpi_qmt);
1292 GOTO(out, rc = PTR_ERR(obd));
1294 /* Waiting for the end of processing mgs config.
1295 * It is needed to be sure all pools are configured.
1297 while (obd->obd_process_conf)
1298 schedule_timeout_uninterruptible(cfs_time_seconds(1));
1300 CFS_FAIL_TIMEOUT(OBD_FAIL_QUOTA_RECALC, cfs_fail_val);
1301 qmt_sarr_read_down(pool);
1303 /* Hold this to be sure that OSTs from this pool
1304 * can't do acquire/release.
1306 * I guess below write semaphore could be a bottleneck
1307 * as qmt_dqacq would be blocked trying to hold
1308 * read_lock at qmt_pool_lookup->qti_pools_add.
1309 * But on the other hand adding/removing OSTs to the pool is
1310 * a rare operation. If finally this would be a problem,
1311 * we can consider another approach. For example we can
1312 * iterate through the POOL's lqes. Take lqe, hold lqe_write_lock
1313 * and go through appropriate OSTs. I don't use this approach now
1314 * as newly created pool hasn't lqes entries. So firstly we need
1315 * to get this lqes from the global pool index file. This
1316 * solution looks more complex, so leave it as it is. */
1317 down_write(&pool->qpi_recalc_sem);
1319 glbl_pool = qmt_pool_lookup_glb(&env, pool->qpi_qmt, pool->qpi_rtype);
1320 if (IS_ERR(glbl_pool))
1321 GOTO(out, rc = PTR_ERR(glbl_pool));
1323 slaves_cnt = qmt_sarr_count(pool);
1324 CDEBUG(D_QUOTA, "Starting pool recalculation for %d slaves in %s\n",
1325 slaves_cnt, pool->qpi_name);
1327 for (qtype = 0; qtype < LL_MAXQUOTAS; qtype++) {
1328 for (i = 0; i < slaves_cnt; i++) {
1329 struct qmt_thread_info *qti = qmt_info(&env);
1330 struct dt_object *slv_obj;
1331 struct obd_uuid uuid;
1334 if (kthread_should_stop())
1335 GOTO(out_stop, rc = 0);
1336 idx = qmt_sarr_get_idx(pool, i);
1339 /* We don't need fsname here - anyway
1340 * lquota_disk_slv_filename ignores it. */
1341 snprintf(uuid.uuid, UUID_MAX, "-OST%04x_UUID", idx);
1342 lquota_generate_fid(&qti->qti_fid, pool->qpi_rtype,
1344 /* look-up index file associated with acquiring slave */
1345 slv_obj = lquota_disk_slv_find(&env,
1346 glbl_pool->qpi_qmt->qmt_child,
1347 glbl_pool->qpi_root,
1350 if (IS_ERR(slv_obj))
1351 GOTO(out_stop, rc = PTR_ERR(slv_obj));
1353 CDEBUG(D_QUOTA, "slv_obj is found %p for uuid %s\n",
1354 slv_obj, uuid.uuid);
1355 qmt_obj_recalc(&env, slv_obj, pool->qpi_site[qtype]);
1356 dt_object_put(&env, slv_obj);
1358 /* Now go trough the site hash and compare lqe_granted
1359 * with lqe_calc_granted. Write new value if disagree */
1361 cfs_hash_for_each(pool->qpi_site[qtype]->lqs_hash,
1362 qmt_site_recalc_cb, &env);
1366 qpi_putref(&env, glbl_pool);
1368 if (xchg(&pool->qpi_recalc_task, NULL) == NULL)
1370 * Someone is waiting for us to stop - be sure not to exit
1371 * before kthread_stop() gets a ref on the task. No event
1372 * will happen on 'pool, this is just a convenient way to
1375 wait_var_event(pool, kthread_should_stop());
1377 clear_bit(QPI_FLAG_RECALC_OFFSET, &pool->qpi_flags);
1378 /* Pool can't be changed, since sem has been down.
1379 * Thus until up_read, no one can restart recalc thread.
1382 qmt_sarr_read_up(pool);
1383 up_write(&pool->qpi_recalc_sem);
1386 /* qpi_getref has been called in qmt_start_pool_recalc,
1387 * however we can't call qpi_putref if lu_env_init failed.
1389 if (env.le_ctx.lc_state == LCS_ENTERED) {
1390 qpi_putref(&env, pool);
1397 static int qmt_start_pool_recalc(struct lu_env *env, struct qmt_pool_info *qpi)
1399 struct task_struct *task;
1402 if (!test_and_set_bit(QPI_FLAG_RECALC_OFFSET, &qpi->qpi_flags)) {
1403 LASSERT(!qpi->qpi_recalc_task);
1406 task = kthread_create(qmt_pool_recalc, qpi,
1407 "qsd_reint_%s", qpi->qpi_name);
1409 clear_bit(QPI_FLAG_RECALC_OFFSET, &qpi->qpi_flags);
1411 qpi_putref(env, qpi);
1413 qpi->qpi_recalc_task = task;
1414 /* Using park/unpark to start the thread ensures that
1415 * the thread function does get calls, so the
1416 * ref on qpi will be dropped
1419 kthread_unpark(task);
1426 static inline void qmt_stop_pool_recalc(struct qmt_pool_info *qpi)
1428 struct task_struct *task;
1430 task = xchg(&qpi->qpi_recalc_task, NULL);
1435 static int qmt_pool_slv_nr_change(const struct lu_env *env,
1436 struct qmt_pool_info *pool,
1439 struct qmt_pool_info *glbl_pool;
1442 glbl_pool = qmt_pool_lookup_glb(env, pool->qpi_qmt, LQUOTA_RES_DT);
1443 if (IS_ERR(glbl_pool))
1444 RETURN(PTR_ERR(glbl_pool));
1446 for (qtype = 0; qtype < LL_MAXQUOTAS; qtype++) {
1447 struct qmt_thread_info *qti = qmt_info(env);
1448 struct dt_object *slv_obj;
1449 struct obd_uuid uuid;
1451 /* We don't need fsname here - anyway
1452 * lquota_disk_slv_filename ignores it. */
1453 snprintf(uuid.uuid, UUID_MAX, "-OST%04x_UUID", idx);
1454 lquota_generate_fid(&qti->qti_fid, pool->qpi_rtype,
1456 /* look-up index file associated with acquiring slave */
1457 slv_obj = lquota_disk_slv_find(env,
1458 glbl_pool->qpi_qmt->qmt_child,
1459 glbl_pool->qpi_root,
1462 if (IS_ERR(slv_obj))
1466 pool->qpi_slv_nr[QMT_STYPE_OST][qtype]++;
1468 pool->qpi_slv_nr[QMT_STYPE_OST][qtype]--;
1469 dt_object_put(env, slv_obj);
1471 qpi_putref(env, glbl_pool);
1476 static int qmt_pool_add_rem(struct obd_device *obd, char *poolname,
1477 char *slavename, bool add)
1479 struct qmt_device *qmt = lu2qmt_dev(obd->obd_lu_dev);
1480 struct qmt_pool_info *qpi;
1485 if (qmt->qmt_stopping)
1488 if (strnlen(poolname, LOV_MAXPOOLNAME + 1) > LOV_MAXPOOLNAME)
1489 RETURN(-ENAMETOOLONG);
1491 CDEBUG(D_QUOTA, add ? "%s: pool %s, adding %s\n" :
1492 "%s: pool %s, removing %s\n",
1493 obd->obd_name, poolname, slavename);
1495 rc = server_name2index(slavename, &idx, NULL);
1496 if (rc != LDD_F_SV_TYPE_OST)
1499 rc = lu_env_init(&env, LCT_MD_THREAD);
1501 CERROR("%s: cannot init env: rc = %d\n", obd->obd_name, rc);
1505 qpi = qmt_pool_lookup_name(&env, qmt, LQUOTA_RES_DT, poolname);
1507 CWARN("%s: can't find pool %s: rc = %long\n",
1508 obd->obd_name, poolname, PTR_ERR(qpi));
1509 GOTO(out, rc = PTR_ERR(qpi));
1512 rc = add ? qmt_sarr_pool_add(qpi, idx, QMT_STYPE_OST) :
1513 qmt_sarr_pool_rem(qpi, idx);
1515 /* message is checked in sanity-quota test_1b */
1516 CERROR("%s: can't %s %s pool '%s': rc = %d\n",
1517 obd->obd_name, add ? "add to" : "remove", slavename,
1519 GOTO(out_putref, rc);
1521 qmt_pool_slv_nr_change(&env, qpi, idx, add);
1522 qmt_start_pool_recalc(&env, qpi);
1525 qpi_putref(&env, qpi);
1534 * Add a single target device to the named pool.
1536 * \param[in] obd OBD device on which to add the pool
1537 * \param[in] poolname name of the pool to which to add the target \a slavename
1538 * \param[in] slavename name of the target device to be added
1540 * \retval 0 if \a slavename was (previously) added to the pool
1541 * \retval negative error number on failure
1543 int qmt_pool_add(struct obd_device *obd, char *poolname, char *slavename)
1545 return qmt_pool_add_rem(obd, poolname, slavename, true);
1549 * Remove the named target from the specified pool.
1551 * \param[in] obd OBD device from which to remove \a poolname
1552 * \param[in] poolname name of the pool to be changed
1553 * \param[in] slavename name of the target to remove from \a poolname
1555 * \retval 0 on successfully removing \a slavename from the pool
1556 * \retval negative number on error (e.g. \a slavename not in pool)
1558 int qmt_pool_rem(struct obd_device *obd, char *poolname, char *slavename)
1560 return qmt_pool_add_rem(obd, poolname, slavename, false);
1564 * Remove the named pool from the QMT device.
1566 * \param[in] obd OBD device on which pool was previously created
1567 * \param[in] poolname name of pool to remove from \a obd
1569 * \retval 0 on successfully removing the pool
1570 * \retval negative error numbers for failures
1572 int qmt_pool_del(struct obd_device *obd, char *poolname)
1574 struct qmt_device *qmt = lu2qmt_dev(obd->obd_lu_dev);
1575 struct qmt_pool_info *qpi;
1577 char buf[LQUOTA_NAME_MAX];
1583 if (strnlen(poolname, LOV_MAXPOOLNAME + 1) > LOV_MAXPOOLNAME)
1584 RETURN(-ENAMETOOLONG);
1586 CDEBUG(D_QUOTA, "Removing quota pool "LOV_POOLNAMEF"\n",
1589 rc = lu_env_init(&env, LCT_MD_THREAD);
1591 CERROR("%s: cannot init env: rc = %d\n", obd->obd_name, rc);
1595 /* look-up pool in charge of this global index FID */
1596 qpi = qmt_pool_lookup_name(&env, qmt, LQUOTA_RES_DT, poolname);
1598 /* Valid case for several MDTs at the same node -
1599 * pool removed by the 1st MDT in config */
1600 CDEBUG(D_QUOTA, "Cannot find pool %s\n", poolname);
1602 RETURN(PTR_ERR(qpi));
1605 for (qtype = 0; qtype < LL_MAXQUOTAS; qtype++) {
1606 lquota_generate_fid(&fid, LQUOTA_RES_DT, qtype);
1607 snprintf(buf, LQUOTA_NAME_MAX, "0x%x", fid.f_oid);
1608 rc = local_object_unlink(&env, qmt->qmt_child,
1609 qpi->qpi_root, buf);
1611 CWARN("%s: cannot unlink %s from pool %s: rc = %d\n",
1612 obd->obd_name, buf, poolname, rc);
1615 /* put ref from look-up */
1616 qpi_putref(&env, qpi);
1617 /* put last ref to free qpi */
1618 qpi_putref(&env, qpi);
1620 snprintf(buf, LQUOTA_NAME_MAX, "%s-%s",
1621 RES_NAME(LQUOTA_RES_DT), poolname);
1622 rc = local_object_unlink(&env, qmt->qmt_child, qmt->qmt_root, buf);
1624 CWARN("%s: cannot unlink dir %s: rc = %d\n",
1625 obd->obd_name, poolname, rc);
1631 static inline int qmt_sarr_pool_init(struct qmt_pool_info *qpi)
1633 return lu_tgt_pool_init(&qpi->qpi_sarr.osts, 0);
1637 _qmt_sarr_pool_add(struct qmt_pool_info *qpi, int idx, int stype, bool locked)
1639 /* We don't have an array for DOM */
1640 if (qmt_dom(qpi->qpi_rtype, stype))
1644 return lu_tgt_pool_add_locked(&qpi->qpi_sarr.osts, idx, 32);
1646 return lu_tgt_pool_add(&qpi->qpi_sarr.osts, idx, 32);
1649 static inline int qmt_sarr_pool_rem(struct qmt_pool_info *qpi, int idx)
1651 return lu_tgt_pool_remove(&qpi->qpi_sarr.osts, idx);
1654 static inline void qmt_sarr_pool_free(struct qmt_pool_info *qpi)
1656 if (qpi->qpi_sarr.osts.op_array)
1657 lu_tgt_pool_free(&qpi->qpi_sarr.osts);
1660 static inline int qmt_sarr_check_idx(struct qmt_pool_info *qpi, int idx)
1662 if (qmt_pool_global(qpi))
1665 return lu_tgt_check_index(idx, &qpi->qpi_sarr.osts);
1668 int qmt_sarr_get_idx(struct qmt_pool_info *qpi, int arr_idx)
1670 LASSERTF(arr_idx < qpi->qpi_sarr.osts.op_count && arr_idx >= 0,
1671 "idx invalid %d op_count %d\n", arr_idx,
1672 qpi->qpi_sarr.osts.op_count);
1673 return qpi->qpi_sarr.osts.op_array[arr_idx];
1676 /* Number of slaves in a pool */
1677 unsigned int qmt_sarr_count(struct qmt_pool_info *qpi)
1679 return qpi->qpi_sarr.osts.op_count;