4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; if not, write to the
18 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19 * Boston, MA 021110-1307, USA
24 * Copyright (c) 2012, 2017, Intel Corporation.
25 * Use is subject to license terms.
27 * Author: Johann Lombardi <johann.lombardi@intel.com>
28 * Author: Niu Yawei <yawei.niu@intel.com>
32 * A Quota Master Target has a list(qmt_pool_list) where it stores qmt_pool_info
33 * structures. There is one such structure for each pool managed by the QMT.
35 * Each pool can have different quota types enforced (typically user & group
36 * quota). A pool is in charge of managing lquota_entry structures for each
37 * quota type. This is done by creating one lquota_entry site per quota
38 * type. A site stores entries in a hash table and read quota settings from disk
39 * when a given ID isn't present in the hash.
41 * The pool API exported here is the following:
42 * - qmt_pool_init(): initializes the general QMT structures used to manage
44 * - qmt_pool_fini(): frees the structures allocated by qmt_pool_fini().
45 * - qmt_pool_prepare(): sets up the on-disk indexes associated with each pool.
46 * - qmt_pool_new_conn(): is used to create a new slave index file.
47 * - qmt_pool_lqe_lookup(): returns an up-to-date lquota entry associated with
51 #define DEBUG_SUBSYSTEM S_LQUOTA
53 #include <obd_class.h>
54 #include <lprocfs_status.h>
55 #include "qmt_internal.h"
57 static inline int qmt_sarr_pool_init(struct qmt_pool_info *qpi);
58 static inline int qmt_sarr_pool_add(struct qmt_pool_info *qpi,
60 static inline int qmt_sarr_pool_rem(struct qmt_pool_info *qpi, int idx);
61 static inline int qmt_sarr_pool_free(struct qmt_pool_info *qpi);
62 static inline int qmt_sarr_check_idx(struct qmt_pool_info *qpi, int idx);
63 static inline void qmt_stop_pool_recalc(struct qmt_pool_info *qpi);
66 * Static helper functions not used outside the scope of this file
69 static inline void qpi_putref_locked(struct qmt_pool_info *pool)
71 LASSERT(atomic_read(&pool->qpi_ref) > 1);
72 atomic_dec(&pool->qpi_ref);
75 /* some procfs helpers */
76 static int qpi_state_seq_show(struct seq_file *m, void *data)
78 struct qmt_pool_info *pool = m->private;
81 LASSERT(pool != NULL);
83 seq_printf(m, "pool:\n"
87 " least qunit: %lu\n",
89 RES_NAME(pool->qpi_rtype),
90 atomic_read(&pool->qpi_ref),
91 pool->qpi_least_qunit);
93 for (type = 0; type < LL_MAXQUOTAS; type++)
94 seq_printf(m, " %s:\n"
98 qpi_slv_nr(pool, type),
99 atomic_read(&pool->qpi_site[type]->lqs_hash->hs_count));
103 LPROC_SEQ_FOPS_RO(qpi_state);
105 static int qpi_soft_least_qunit_seq_show(struct seq_file *m, void *data)
107 struct qmt_pool_info *pool = m->private;
108 LASSERT(pool != NULL);
110 seq_printf(m, "%lu\n", pool->qpi_soft_least_qunit);
115 qpi_soft_least_qunit_seq_write(struct file *file, const char __user *buffer,
116 size_t count, loff_t *off)
118 struct seq_file *m = file->private_data;
119 struct qmt_pool_info *pool = m->private;
120 long long least_qunit;
123 LASSERT(pool != NULL);
125 /* Not tuneable for inode limit */
126 if (pool->qpi_rtype != LQUOTA_RES_DT)
129 rc = kstrtoll_from_user(buffer, count, 0, &least_qunit);
133 /* Miminal qpi_soft_least_qunit */
134 qunit = pool->qpi_least_qunit << 2;
135 /* The value must be power of miminal qpi_soft_least_qunit, see
136 * how the qunit is adjusted in qmt_adjust_qunit(). */
137 while (qunit > 0 && qunit < least_qunit)
140 qunit = INT_MAX & ~3;
142 pool->qpi_soft_least_qunit = qunit;
145 LPROC_SEQ_FOPS(qpi_soft_least_qunit);
147 static struct lprocfs_vars lprocfs_quota_qpi_vars[] = {
149 .fops = &qpi_state_fops },
150 { .name = "soft_least_qunit",
151 .fops = &qpi_soft_least_qunit_fops },
156 * Allocate a new qmt_pool_info structure and add it to qmt_pool_list.
158 * \param env - is the environment passed by the caller
159 * \param qmt - is the quota master target
160 * \param pool_type - is the resource type of this pool instance, either
161 * LQUOTA_RES_MD or LQUOTA_RES_DT.
163 * \retval - 0 on success, appropriate error on failure
165 static int qmt_pool_alloc(const struct lu_env *env, struct qmt_device *qmt,
166 char *pool_name, int pool_type)
168 struct qmt_thread_info *qti = qmt_info(env);
169 struct qmt_pool_info *pool;
176 INIT_LIST_HEAD(&pool->qpi_linkage);
177 init_waitqueue_head(&pool->qpi_recalc_thread.t_ctl_waitq);
178 thread_set_flags(&pool->qpi_recalc_thread, SVC_STOPPED);
179 init_rwsem(&pool->qpi_recalc_sem);
181 pool->qpi_rtype = pool_type;
183 /* initialize refcount to 1, hash table will then grab an additional
185 atomic_set(&pool->qpi_ref, 1);
187 /* set up least qunit size to use for this pool */
188 pool->qpi_least_qunit = LQUOTA_LEAST_QUNIT(pool_type);
189 if (pool_type == LQUOTA_RES_DT)
190 pool->qpi_soft_least_qunit = pool->qpi_least_qunit << 2;
192 pool->qpi_soft_least_qunit = pool->qpi_least_qunit;
194 /* grab reference on master target that this pool belongs to */
195 lu_device_get(qmt2lu_dev(qmt));
196 lu_ref_add(&qmt2lu_dev(qmt)->ld_reference, "pool", pool);
199 /* create pool proc directory */
200 snprintf(qti->qti_buf, LQUOTA_NAME_MAX, "%s-%s",
201 RES_NAME(pool_type), pool_name);
202 strncpy(pool->qpi_name, pool_name, QPI_MAXNAME);
203 pool->qpi_proc = lprocfs_register(qti->qti_buf, qmt->qmt_proc,
204 lprocfs_quota_qpi_vars, pool);
205 if (IS_ERR(pool->qpi_proc)) {
206 rc = PTR_ERR(pool->qpi_proc);
207 CERROR("%s: failed to create proc entry for pool %s (%d)\n",
208 qmt->qmt_svname, qti->qti_buf, rc);
209 pool->qpi_proc = NULL;
213 rc = qmt_sarr_pool_init(pool);
217 /* add to qmt pool list */
218 down_write(&qmt->qmt_pool_lock);
219 list_add_tail(&pool->qpi_linkage, &qmt->qmt_pool_list);
220 up_write(&qmt->qmt_pool_lock);
224 /* this frees the pool structure since refcount is equal to 1 */
225 qpi_putref(env, pool);
230 * Delete a qmt_pool_info instance and all structures associated.
232 * \param env - is the environment passed by the caller
233 * \param pool - is the qmt_pool_info structure to free
235 void qmt_pool_free(const struct lu_env *env, struct qmt_pool_info *pool)
237 struct qmt_device *qmt = pool->qpi_qmt;
241 /* remove from list */
242 down_write(&qmt->qmt_pool_lock);
243 list_del_init(&pool->qpi_linkage);
244 up_write(&qmt->qmt_pool_lock);
246 if (atomic_read(&pool->qpi_ref) > 0)
249 qmt_stop_pool_recalc(pool);
250 qmt_sarr_pool_free(pool);
252 /* release proc entry */
253 if (pool->qpi_proc) {
254 lprocfs_remove(&pool->qpi_proc);
255 pool->qpi_proc = NULL;
258 /* release per-quota type site used to manage quota entries as well as
259 * references to global index files */
260 for (qtype = 0; qtype < LL_MAXQUOTAS; qtype++) {
261 /* release lqe storing grace time */
262 if (pool->qpi_grace_lqe[qtype] != NULL)
263 lqe_putref(pool->qpi_grace_lqe[qtype]);
266 if (pool->qpi_site[qtype] != NULL &&
267 !IS_ERR(pool->qpi_site[qtype]))
268 lquota_site_free(env, pool->qpi_site[qtype]);
269 /* release reference to global index */
270 if (pool->qpi_glb_obj[qtype] != NULL &&
271 !IS_ERR(pool->qpi_glb_obj[qtype]))
272 dt_object_put(env, pool->qpi_glb_obj[qtype]);
275 /* release reference on pool directory */
276 if (pool->qpi_root != NULL && !IS_ERR(pool->qpi_root))
277 dt_object_put(env, pool->qpi_root);
279 /* release reference on the master target */
280 if (pool->qpi_qmt != NULL) {
281 struct lu_device *ld = qmt2lu_dev(pool->qpi_qmt);
283 lu_ref_del(&ld->ld_reference, "pool", pool);
285 pool->qpi_qmt = NULL;
288 LASSERT(list_empty(&pool->qpi_linkage));
292 static inline void qti_pools_init(const struct lu_env *env)
294 struct qmt_thread_info *qti = qmt_info(env);
296 qti->qti_pools_cnt = 0;
297 qti->qti_pools_num = QMT_MAX_POOL_NUM;
300 #define qti_pools(qti) (qti->qti_pools_num > QMT_MAX_POOL_NUM ? \
301 qti->qti_pools : qti->qti_pools_small)
302 #define qti_pools_env(env) \
303 (qmt_info(env)->qti_pools_num > QMT_MAX_POOL_NUM ? \
304 qmt_info(env)->qti_pools : qmt_info(env)->qti_pools_small)
305 #define qti_pools_cnt(env) (qmt_info(env)->qti_pools_cnt)
307 static inline int qti_pools_add(const struct lu_env *env,
308 struct qmt_pool_info *qpi)
310 struct qmt_thread_info *qti = qmt_info(env);
311 struct qmt_pool_info **pools = qti->qti_pools;
313 pools = qti_pools(qti);
314 LASSERTF(qti->qti_pools_num >= QMT_MAX_POOL_NUM,
315 "Forgot init? %p\n", qti);
317 if (qti->qti_pools_cnt > qti->qti_pools_num) {
318 OBD_ALLOC(pools, sizeof(qpi) * qti->qti_pools_num * 2);
321 memcpy(pools, qti_pools(qti), qti->qti_pools_cnt * sizeof(qpi));
322 /* Don't need to free, if it is the very 1st allocation */
323 if (qti->qti_pools_num > QMT_MAX_POOL_NUM)
324 OBD_FREE(qti->qti_pools,
325 qti->qti_pools_num * sizeof(qpi));
326 qti->qti_pools = pools;
327 qti->qti_pools_num *= 2;
331 /* Take this to protect pool's lqes against changing by
332 * recalculation thread. This would be unlocked at
334 down_read(&qpi->qpi_recalc_sem);
335 if (qmt_pool_global(qpi) && qti_pools_cnt(env) > 0) {
336 pools[qti->qti_pools_cnt++] = pools[0];
337 /* Store global pool always at index 0 */
340 pools[qti->qti_pools_cnt++] = qpi;
343 CDEBUG(D_QUOTA, "Pool %s is added, pools %p qti_pools %p pool_num %d\n",
344 qpi->qpi_name, pools, qti->qti_pools, qti->qti_pools_cnt);
349 static inline void qti_pools_fini(const struct lu_env *env)
351 struct qmt_thread_info *qti = qmt_info(env);
352 struct qmt_pool_info **pools = qti->qti_pools;
355 LASSERT(qti->qti_pools_cnt > 0);
357 pools = qti_pools(qti);
358 for (i = 0; i < qti->qti_pools_cnt; i++) {
359 up_read(&pools[i]->qpi_recalc_sem);
360 qpi_putref(env, pools[i]);
363 if (qti->qti_pools_num > QMT_MAX_POOL_NUM)
364 OBD_FREE(qti->qti_pools,
365 qti->qti_pools_num * sizeof(struct qmt_pool_info *));
369 * Look-up a pool in a list based on the type.
371 * \param env - is the environment passed by the caller
372 * \param qmt - is the quota master target
373 * \param rtype - is the type of this pool, either LQUOTA_RES_MD or
375 * \param pool_name - is the pool name to search for
376 * \param idx - OST or MDT index to search for. When it is >= 0, function
377 * returns array with pointers to all pools that include
378 * targets with requested index.
379 * \param add - add to qti_pool_arr if true
381 struct qmt_pool_info *qmt_pool_lookup(const struct lu_env *env,
382 struct qmt_device *qmt,
387 struct qmt_pool_info *pos, *pool;
391 down_read(&qmt->qmt_pool_lock);
392 if (list_empty(&qmt->qmt_pool_list)) {
393 up_read(&qmt->qmt_pool_lock);
394 RETURN(ERR_PTR(-ENOENT));
397 CDEBUG(D_QUOTA, "type %d name %p index %d\n",
398 rtype, pool_name, idx);
399 /* Now just find a pool with correct type in a list. Further we need
400 * to go through the list and find a pool that includes requested OST
401 * or MDT. Possibly this would return a list of pools that includes
402 * needed target(OST/MDT). */
404 if (idx == -1 && !pool_name)
405 pool_name = GLB_POOL_NAME;
407 list_for_each_entry(pos, &qmt->qmt_pool_list, qpi_linkage) {
408 if (pos->qpi_rtype != rtype)
411 if (idx >= 0 && !qmt_sarr_check_idx(pos, idx)) {
412 rc = qti_pools_add(env, pos);
418 if (pool_name && !strncmp(pool_name, pos->qpi_name,
422 rc = qti_pools_add(env, pos);
431 up_read(&qmt->qmt_pool_lock);
433 if (idx >= 0 && qti_pools_cnt(env))
434 pool = qti_pools_env(env)[0];
436 RETURN(pool ? : ERR_PTR(-ENOENT));
438 CERROR("%s: cannot add pool %s: err = %d\n",
439 qmt->qmt_svname, pos->qpi_name, rc);
444 * Functions implementing the pool API, used by the qmt handlers
448 * Destroy all pools which are still in the pool list.
450 * \param env - is the environment passed by the caller
451 * \param qmt - is the quota master target
454 void qmt_pool_fini(const struct lu_env *env, struct qmt_device *qmt)
456 struct qmt_pool_info *pool, *tmp;
459 /* parse list of pool and destroy each element */
460 list_for_each_entry_safe(pool, tmp, &qmt->qmt_pool_list, qpi_linkage) {
461 /* release extra reference taken in qmt_pool_alloc */
462 qpi_putref(env, pool);
464 LASSERT(list_empty(&qmt->qmt_pool_list));
470 * Initialize pool configure for the quota master target. For now, we only
471 * support the default data (i.e. all OSTs) and metadata (i.e. all the MDTs)
472 * pool which are instantiated in this function.
474 * \param env - is the environment passed by the caller
475 * \param qmt - is the quota master target for which we have to initialize the
478 * \retval - 0 on success, appropriate error on failure
480 int qmt_pool_init(const struct lu_env *env, struct qmt_device *qmt)
485 INIT_LIST_HEAD(&qmt->qmt_pool_list);
486 init_rwsem(&qmt->qmt_pool_lock);
488 /* Instantiate pool master for the default data and metadata pool.
489 * This code will have to be revisited once we support quota on
490 * non-default pools */
491 for (res = LQUOTA_FIRST_RES; res < LQUOTA_LAST_RES; res++) {
492 rc = qmt_pool_alloc(env, qmt, GLB_POOL_NAME, res);
498 qmt_pool_fini(env, qmt);
503 static int qmt_slv_cnt(const struct lu_env *env, struct lu_fid *glb_fid,
504 char *slv_name, struct lu_fid *slv_fid, void *arg)
506 struct obd_uuid uuid;
507 int (*nr)[QMT_STYPE_CNT][LL_MAXQUOTAS] = arg;
511 rc = lquota_extract_fid(glb_fid, NULL, &qtype);
514 obd_str2uuid(&uuid, slv_name);
515 stype = qmt_uuid2idx(&uuid, NULL);
519 (*nr)[stype][qtype]++;
520 CDEBUG(D_QUOTA, "slv_name %s stype %d qtype %d nr %d\n",
521 slv_name, stype, qtype, (*nr)[stype][qtype]);
527 * Set up on-disk index files associated with each pool.
529 * \param env - is the environment passed by the caller
530 * \param qmt - is the quota master target for which we have to initialize the
532 * \param qmt_root - is the on-disk directory created for the QMT.
533 * \param name - is the pool name that we need to setup. Setup all pools
534 * in qmt_pool_list when name is NULL.
536 * \retval - 0 on success, appropriate error on failure
538 int qmt_pool_prepare(const struct lu_env *env, struct qmt_device *qmt,
539 struct dt_object *qmt_root, char *name)
541 struct qmt_thread_info *qti = qmt_info(env);
542 struct lquota_glb_rec *rec = &qti->qti_glb_rec;
543 struct qmt_pool_info *pool;
544 struct dt_device *dev = NULL;
545 dt_obj_version_t version;
546 struct list_head *pos;
547 int rc = 0, i, qtype;
550 /* iterate over each pool in the list and allocate a quota site for each
551 * one. This involves creating a global index file on disk */
552 list_for_each(pos, &qmt->qmt_pool_list) {
553 struct dt_object *obj;
554 struct lquota_entry *lqe;
558 pool = list_entry(pos, struct qmt_pool_info,
561 pool_name = pool->qpi_name;
562 if (name && strncmp(pool_name, name, LOV_MAXPOOLNAME))
564 rtype = pool->qpi_rtype;
566 dev = pool->qpi_qmt->qmt_child;
568 /* allocate directory for this pool */
569 snprintf(qti->qti_buf, LQUOTA_NAME_MAX, "%s-%s",
570 RES_NAME(rtype), pool_name);
571 obj = lquota_disk_dir_find_create(env, qmt->qmt_child, qmt_root,
574 RETURN(PTR_ERR(obj));
575 pool->qpi_root = obj;
577 for (qtype = 0; qtype < LL_MAXQUOTAS; qtype++) {
578 /* Generating FID of global index in charge of storing
579 * settings for this quota type */
580 lquota_generate_fid(&qti->qti_fid, rtype, qtype);
582 /* open/create the global index file for this quota
583 * type. If name is set, it means we came here from
584 * qmt_pool_new and can create glb index with a
585 * local generated FID. */
586 obj = lquota_disk_glb_find_create(env, dev,
589 name ? true : false);
592 CERROR("%s: failed to create glb index copy for %s type: rc = %d\n",
593 qmt->qmt_svname, qtype_name(qtype), rc);
597 pool->qpi_glb_obj[qtype] = obj;
599 version = dt_version_get(env, obj);
600 /* set default grace time for newly created index */
602 rec->qbr_hardlimit = 0;
603 rec->qbr_softlimit = 0;
604 rec->qbr_granted = 0;
605 rec->qbr_time = rtype == LQUOTA_RES_MD ?
606 MAX_IQ_TIME : MAX_DQ_TIME;
608 rc = lquota_disk_write_glb(env, obj, 0, rec);
610 CERROR("%s: failed to set default grace time for %s type: rc = %d\n",
611 qmt->qmt_svname, qtype_name(qtype), rc);
615 rc = lquota_disk_update_ver(env, dev, obj, 1);
617 CERROR("%s: failed to set initial version for %s type: rc = %d\n",
618 qmt->qmt_svname, qtype_name(qtype), rc);
623 /* create quota entry site for this quota type */
624 pool->qpi_site[qtype] = lquota_site_alloc(env, pool,
627 if (IS_ERR(pool->qpi_site[qtype])) {
628 rc = PTR_ERR(pool->qpi_site[qtype]);
629 CERROR("%s: failed to create site for %s type: rc = %d\n",
630 qmt->qmt_svname, qtype_name(qtype), rc);
634 /* count number of slaves which already connected to
635 * the master in the past */
636 for (i = 0; i < QMT_STYPE_CNT; i++)
637 pool->qpi_slv_nr[i][qtype] = 0;
639 rc = lquota_disk_for_each_slv(env, pool->qpi_root,
644 CERROR("%s: failed to scan & count slave indexes for %s type: rc = %d\n",
645 qmt->qmt_svname, qtype_name(qtype), rc);
649 /* Global grace time is stored in quota settings of
651 qti->qti_id.qid_uid = 0;
653 /* look-up quota entry storing grace time */
654 lqe = lqe_locate(env, pool->qpi_site[qtype],
657 RETURN(PTR_ERR(lqe));
658 pool->qpi_grace_lqe[qtype] = lqe;
659 #ifdef CONFIG_PROC_FS
660 /* add procfs file to dump the global index, mostly for
661 * debugging purpose */
662 snprintf(qti->qti_buf, MTI_NAME_MAXLEN,
663 "glb-%s", qtype_name(qtype));
664 rc = lprocfs_seq_create(pool->qpi_proc, qti->qti_buf,
665 0444, &lprocfs_quota_seq_fops,
668 CWARN("%s: Error adding procfs file for global quota index "DFID": rc = %d\n",
669 qmt->qmt_svname, PFID(&qti->qti_fid), rc);
680 * Handle new slave connection. Called when a slave enqueues the global quota
681 * lock at the beginning of the reintegration procedure.
683 * \param env - is the environment passed by the caller
684 * \parap qmt - is the quota master target handling this request
685 * \param glb_fid - is the fid of the global index file
686 * \param slv_fid - is the fid of the newly created slave index file
687 * \param slv_ver - is the current version of the slave index file
688 * \param uuid - is the uuid of slave which is (re)connecting to the master
691 * \retval - 0 on success, appropriate error on failure
693 int qmt_pool_new_conn(const struct lu_env *env, struct qmt_device *qmt,
694 struct lu_fid *glb_fid, struct lu_fid *slv_fid,
695 __u64 *slv_ver, struct obd_uuid *uuid)
697 struct qmt_pool_info *pool;
698 struct dt_object *slv_obj;
699 int pool_type, qtype, stype;
700 bool created = false;
703 stype = qmt_uuid2idx(uuid, &idx);
707 /* extract pool info from global index FID */
708 rc = lquota_extract_fid(glb_fid, &pool_type, &qtype);
712 /* look-up pool in charge of this global index FID */
714 pool = qmt_pool_lookup_arr(env, qmt, pool_type, idx);
716 RETURN(PTR_ERR(pool));
718 /* look-up slave index file */
719 slv_obj = lquota_disk_slv_find(env, qmt->qmt_child, pool->qpi_root,
721 if (IS_ERR(slv_obj) && PTR_ERR(slv_obj) == -ENOENT) {
722 /* create slave index file */
723 slv_obj = lquota_disk_slv_find_create(env, qmt->qmt_child,
724 pool->qpi_root, glb_fid,
728 if (IS_ERR(slv_obj)) {
729 rc = PTR_ERR(slv_obj);
730 CERROR("%s: failed to create quota slave index file for %s (%d)"
731 "\n", qmt->qmt_svname, obd_uuid2str(uuid), rc);
735 /* retrieve slave fid & current object version */
736 memcpy(slv_fid, lu_object_fid(&slv_obj->do_lu), sizeof(*slv_fid));
737 *slv_ver = dt_version_get(env, slv_obj);
738 dt_object_put(env, slv_obj);
740 for (i = 0; i < qti_pools_cnt(env); i++)
741 qti_pools_env(env)[i]->qpi_slv_nr[stype][qtype]++;
748 * Look-up a lquota_entry in the pool hash and allocate it if not found.
750 * \param env - is the environment passed by the caller
751 * \param qmt - is the quota master target for which we have to initialize the
753 * \param pool_type - is the pool type, either LQUOTA_RES_MD or LQUOTA_RES_DT.
754 * \param qtype - is the quota type, either user or group.
755 * \param qid - is the quota ID to look-up
757 * \retval - valid pointer to lquota entry on success, appropriate error on
760 struct lquota_entry *qmt_pool_lqe_lookup(const struct lu_env *env,
761 struct qmt_device *qmt,
762 int pool_type, int qtype,
763 union lquota_id *qid,
766 struct qmt_pool_info *pool;
767 struct lquota_entry *lqe;
770 /* look-up pool responsible for this global index FID */
771 pool = qmt_pool_lookup_name(env, qmt, pool_type, pool_name);
773 RETURN(ERR_CAST(pool));
775 if (qid->qid_uid == 0) {
776 /* caller wants to access grace time, no need to look up the
777 * entry since we keep a reference on ID 0 all the time */
778 lqe = pool->qpi_grace_lqe[qtype];
783 /* now that we have the pool, let's look-up the quota entry in the
784 * right quota site */
785 lqe = lqe_locate(env, pool->qpi_site[qtype], qid);
787 qpi_putref(env, pool);
791 int qmt_pool_lqes_lookup(const struct lu_env *env,
792 struct qmt_device *qmt,
793 int rtype, int stype,
794 int qtype, union lquota_id *qid,
795 char *pool_name, int idx)
797 struct qmt_pool_info *pool;
798 struct lquota_entry *lqe;
802 /* Until MDT pools are not emplemented, all MDTs belong to
803 * global pool, thus lookup lqes only from global pool. */
804 if (rtype == LQUOTA_RES_DT && stype == QMT_STYPE_MDT)
809 /* look-up pool responsible for this global index FID */
810 pool = qmt_pool_lookup_arr(env, qmt, rtype, idx);
813 RETURN(PTR_ERR(pool));
816 /* now that we have the pool, let's look-up the quota entry in the
817 * right quota site */
819 for (i = 0; i < qti_pools_cnt(env); i++) {
820 pool = qti_pools_env(env)[i];
821 lqe = lqe_locate(env, pool->qpi_site[qtype], qid);
824 GOTO(out, rc = PTR_ERR(lqe));
826 /* Only release could be done for not enforced lqe
827 * (see qmt_dqacq0). However slave could request to
828 * release more than not global lqe had granted before
829 * lqe_enforced was cleared. It is legal case,
830 * because even if current lqe is not enforced,
831 * lqes from other pools are still active and avilable
832 * for acquiring. Furthermore, skip not enforced lqe
833 * to don't make extra allocations. */
834 /*if (!lqe_is_glbl(lqe) && !lqe->lqe_enforced) {
838 qti_lqes_add(env, lqe);
840 LASSERT(qti_lqes_glbl(env)->lqe_is_global);
847 static int lqes_cmp(const void *arg1, const void *arg2)
849 const struct lquota_entry *lqe1, *lqe2;
852 return lqe1->lqe_qunit > lqe2->lqe_qunit;
855 void qmt_lqes_sort(const struct lu_env *env)
857 sort(qti_lqes(env), qti_lqes_cnt(env), sizeof(void *), lqes_cmp, NULL);
858 /* global lqe was moved during sorting */
859 if (!qti_lqes_glbl(env)->lqe_is_global) {
861 for (i = 0; i < qti_lqes_cnt(env); i++) {
862 if (qti_lqes(env)[i]->lqe_is_global) {
863 qti_glbl_lqe_idx(env) = i;
870 int qmt_pool_lqes_lookup_spec(const struct lu_env *env, struct qmt_device *qmt,
871 int rtype, int qtype, union lquota_id *qid)
873 struct qmt_pool_info *pos;
874 struct lquota_entry *lqe;
878 down_read(&qmt->qmt_pool_lock);
879 if (list_empty(&qmt->qmt_pool_list)) {
880 up_read(&qmt->qmt_pool_lock);
884 list_for_each_entry(pos, &qmt->qmt_pool_list, qpi_linkage) {
885 if (pos->qpi_rtype != rtype)
887 /* Don't take into account pools without slaves */
888 if (!qpi_slv_nr(pos, qtype))
890 lqe = lqe_find(env, pos->qpi_site[qtype], qid);
891 /* ENOENT is valid case for lqe from non global pool
892 * that hasn't limits, i.e. not enforced. Continue even
893 * in case of error - we can handle already found lqes */
894 if (IS_ERR_OR_NULL(lqe)) {
895 /* let know that something went wrong */
896 rc = lqe ? PTR_ERR(lqe) : -ENOENT;
899 if (!lqe->lqe_enforced) {
900 /* no settings for this qid_uid */
904 qti_lqes_add(env, lqe);
905 CDEBUG(D_QUOTA, "adding lqe %p from pool %s\n",
908 up_read(&qmt->qmt_pool_lock);
913 * Allocate a new pool for the specified device.
915 * Allocate a new pool_desc structure for the specified \a new_pool
916 * device to create a pool with the given \a poolname. The new pool
917 * structure is created with a single reference, and is freed when the
918 * reference count drops to zero.
920 * \param[in] obd Lustre OBD device on which to add a pool iterator
921 * \param[in] poolname the name of the pool to be created
923 * \retval 0 in case of success
924 * \retval negative error code in case of error
926 int qmt_pool_new(struct obd_device *obd, char *poolname)
928 struct qmt_device *qmt = lu2qmt_dev(obd->obd_lu_dev);
929 struct qmt_pool_info *qpi;
934 if (strnlen(poolname, LOV_MAXPOOLNAME + 1) > LOV_MAXPOOLNAME)
935 RETURN(-ENAMETOOLONG);
937 rc = lu_env_init(&env, LCT_MD_THREAD);
939 CERROR("%s: can't init env: rc = %d\n", obd->obd_name, rc);
943 qpi = qmt_pool_lookup_name(&env, qmt, LQUOTA_RES_DT, poolname);
945 /* Valid case when several MDTs are mounted
946 * at the same node. */
947 CDEBUG(D_QUOTA, "pool %s already exists\n", poolname);
948 qpi_putref(&env, qpi);
949 GOTO(out_env, rc = -EEXIST);
951 if (PTR_ERR(qpi) != -ENOENT) {
952 CWARN("%s: pool %s lookup failed: rc = %ld\n",
953 obd->obd_name, poolname, PTR_ERR(qpi));
954 GOTO(out_env, rc = PTR_ERR(qpi));
957 /* Now allocate and prepare only DATA pool.
958 * Further when MDT pools will be ready we need to add
959 * a cycle here and setup pools of both types. Another
960 * approach is to find out pool of which type should be
962 rc = qmt_pool_alloc(&env, qmt, poolname, LQUOTA_RES_DT);
964 CERROR("%s: can't alloc pool %s: rc = %d\n",
965 obd->obd_name, poolname, rc);
969 rc = qmt_pool_prepare(&env, qmt, qmt->qmt_root, poolname);
971 CERROR("%s: can't prepare pool for %s: rc = %d\n",
972 obd->obd_name, poolname, rc);
976 CDEBUG(D_QUOTA, "Quota pool "LOV_POOLNAMEF" added\n",
981 qpi = qmt_pool_lookup_name(&env, qmt, LQUOTA_RES_DT, poolname);
983 qpi_putref(&env, qpi);
984 qpi_putref(&env, qpi);
992 qmt_obj_recalc(const struct lu_env *env, struct dt_object *obj,
993 struct ptlrpc_thread *thread, struct lquota_site *site)
995 struct qmt_thread_info *qti = qmt_info(env);
996 union lquota_id *qid = &qti->qti_id;
997 const struct dt_it_ops *iops;
1004 iops = &obj->do_index_ops->dio_it;
1006 it = iops->init(env, obj, 0);
1008 CWARN("quota: initialize it for "DFID" failed: rc = %ld\n",
1009 PFID(&qti->qti_fid), PTR_ERR(it));
1010 RETURN(PTR_ERR(it));
1013 rc = iops->load(env, it, 0);
1015 CWARN("quota: load first entry for "DFID" failed: rc = %d\n",
1016 PFID(&qti->qti_fid), rc);
1018 } else if (rc == 0) {
1019 rc = iops->next(env, it);
1021 GOTO(out, rc = (rc < 0) ? rc : 0);
1025 struct lquota_entry *lqe;
1027 key = iops->key(env, it);
1029 CWARN("quota: error key for "DFID": rc = %ld\n",
1030 PFID(&qti->qti_fid), PTR_ERR(key));
1031 GOTO(out, rc = PTR_ERR(key));
1034 /* skip the root user/group */
1035 if (*((__u64 *)key) == 0)
1038 qid->qid_uid = *((__u64 *)key);
1040 rc = qmt_slv_read(env, qid, obj, &granted);
1044 lqe = lqe_locate(env, site, qid);
1046 GOTO(out, rc = PTR_ERR(lqe));
1047 lqe_write_lock(lqe);
1048 lqe->lqe_recalc_granted += granted;
1049 lqe_write_unlock(lqe);
1052 rc = iops->next(env, it);
1054 CWARN("quota: failed to parse index "DFID
1055 ", ->next error: rc = %d\n",
1056 PFID(&qti->qti_fid), rc);
1057 } while (rc == 0 && thread_is_running(thread));
1061 iops->fini(env, it);
1065 static int qmt_site_recalc_cb(struct cfs_hash *hs, struct cfs_hash_bd *bd,
1066 struct hlist_node *hnode, void *data)
1068 struct lquota_entry *lqe;
1069 struct lu_env *env = data;
1071 lqe = hlist_entry(hnode, struct lquota_entry, lqe_hash);
1072 LASSERT(atomic_read(&lqe->lqe_ref) > 0);
1074 lqe_write_lock(lqe);
1075 if (lqe->lqe_granted != lqe->lqe_recalc_granted) {
1076 struct qmt_device *qmt = lqe2qpi(lqe)->qpi_qmt;
1078 bool need_notify = false;
1081 LQUOTA_DEBUG(lqe, "lqe_recalc_granted %llu\n",
1082 lqe->lqe_recalc_granted);
1083 lqe->lqe_granted = lqe->lqe_recalc_granted;
1084 /* Always returns true, if there is no slaves in a pool */
1085 need_notify |= qmt_adjust_qunit(env, lqe);
1086 need_notify |= qmt_adjust_edquot(lqe, ktime_get_real_seconds());
1088 /* Find all lqes with lqe_id to reseed lgd array */
1089 rc = qmt_pool_lqes_lookup_spec(env, qmt, lqe_rtype(lqe),
1090 lqe_qtype(lqe), &lqe->lqe_id);
1091 if (!rc && qti_lqes_glbl(env)->lqe_glbl_data) {
1093 qti_lqes_glbl(env)->lqe_glbl_data);
1094 qmt_id_lock_notify(qmt, qti_lqes_glbl(env));
1098 th = dt_trans_create(env, qmt->qmt_child);
1102 rc = lquota_disk_declare_write(env, th,
1108 rc = dt_trans_start_local(env, qmt->qmt_child, th);
1112 qmt_glb_write(env, th, lqe, 0, NULL);
1114 dt_trans_stop(env, qmt->qmt_child, th);
1117 lqe->lqe_recalc_granted = 0;
1118 lqe_write_unlock(lqe);
1123 #define MDT_DEV_NAME_LEN (LUSTRE_MAXFSNAME + sizeof("-MDT0000"))
1124 static struct obd_device *qmt_get_mgc(struct qmt_device *qmt)
1126 char mdt_name[MDT_DEV_NAME_LEN];
1127 struct lustre_mount_info *lmi;
1128 struct obd_device *obd;
1132 rc = server_name2fsname(qmt->qmt_svname, mdt_name, NULL);
1134 CERROR("quota: cannot get server name from %s: rc = %d\n",
1135 qmt->qmt_svname, rc);
1136 RETURN(ERR_PTR(rc));
1139 strlcat(mdt_name, "-MDT0000", MDT_DEV_NAME_LEN);
1140 lmi = server_get_mount(mdt_name);
1143 CERROR("%s: cannot get mount info from %s: rc = %d\n",
1144 qmt->qmt_svname, mdt_name, rc);
1145 RETURN(ERR_PTR(rc));
1147 obd = s2lsi(lmi->lmi_sb)->lsi_mgc;
1148 lustre_put_lsi(lmi->lmi_sb);
1153 static int qmt_pool_recalc(void *args)
1155 struct qmt_pool_info *pool, *glbl_pool;
1156 struct rw_semaphore *sem = NULL;
1157 struct obd_device *obd;
1159 int i, rc, qtype, slaves_cnt;
1163 thread_set_flags(&pool->qpi_recalc_thread, SVC_RUNNING);
1165 obd = qmt_get_mgc(pool->qpi_qmt);
1167 GOTO(out, rc = PTR_ERR(obd));
1169 /* Waiting for the end of processing mgs config.
1170 * It is needed to be sure all pools are configured. */
1171 while (obd->obd_process_conf)
1172 schedule_timeout_uninterruptible(cfs_time_seconds(1));
1174 sem = qmt_sarr_rwsem(pool);
1177 /* Hold this to be sure that OSTs from this pool
1178 * can't do acquire/release.
1180 * I guess below write semaphore could be a bottleneck
1181 * as qmt_dqacq would be blocked trying to hold
1182 * read_lock at qmt_pool_lookup->qti_pools_add.
1183 * But on the other hand adding/removing OSTs to the pool is
1184 * a rare operation. If finally this would be a problem,
1185 * we can consider another approach. For example we can
1186 * iterate through the POOL's lqes. Take lqe, hold lqe_write_lock
1187 * and go through appropriate OSTs. I don't use this approach now
1188 * as newly created pool hasn't lqes entries. So firstly we need
1189 * to get this lqes from the global pool index file. This
1190 * solution looks more complex, so leave it as it is. */
1191 down_write(&pool->qpi_recalc_sem);
1193 rc = lu_env_init(&env, LCT_MD_THREAD);
1195 CERROR("%s: cannot init env: rc = %d\n", obd->obd_name, rc);
1199 glbl_pool = qmt_pool_lookup_glb(&env, pool->qpi_qmt, pool->qpi_rtype);
1200 if (IS_ERR(glbl_pool))
1201 GOTO(out_env, rc = PTR_ERR(glbl_pool));
1203 slaves_cnt = qmt_sarr_count(pool);
1204 CDEBUG(D_QUOTA, "Starting pool recalculation for %d slaves in %s\n",
1205 slaves_cnt, pool->qpi_name);
1207 for (qtype = 0; qtype < LL_MAXQUOTAS; qtype++) {
1208 for (i = 0; i < slaves_cnt; i++) {
1209 struct qmt_thread_info *qti = qmt_info(&env);
1210 struct dt_object *slv_obj;
1211 struct obd_uuid uuid;
1214 if (thread_is_stopping(&pool->qpi_recalc_thread))
1215 GOTO(out_stop, rc = 0);
1216 idx = qmt_sarr_get_idx(pool, i);
1219 /* We don't need fsname here - anyway
1220 * lquota_disk_slv_filename ignores it. */
1221 snprintf(uuid.uuid, UUID_MAX, "-OST%04x_UUID", idx);
1222 lquota_generate_fid(&qti->qti_fid, pool->qpi_rtype,
1224 /* look-up index file associated with acquiring slave */
1225 slv_obj = lquota_disk_slv_find(&env,
1226 glbl_pool->qpi_qmt->qmt_child,
1227 glbl_pool->qpi_root,
1230 if (IS_ERR(slv_obj))
1231 GOTO(out_stop, rc = PTR_ERR(slv_obj));
1233 CDEBUG(D_QUOTA, "slv_obj is found %p for uuid %s\n",
1234 slv_obj, uuid.uuid);
1235 qmt_obj_recalc(&env, slv_obj,
1236 &pool->qpi_recalc_thread,
1237 pool->qpi_site[qtype]);
1238 dt_object_put(&env, slv_obj);
1240 /* Now go trough the site hash and compare lqe_granted
1241 * with lqe_calc_granted. Write new value if disagree */
1243 cfs_hash_for_each(pool->qpi_site[qtype]->lqs_hash,
1244 qmt_site_recalc_cb, &env);
1248 qpi_putref(&env, glbl_pool);
1252 thread_set_flags(&pool->qpi_recalc_thread, SVC_STOPPED);
1253 wake_up(&pool->qpi_recalc_thread.t_ctl_waitq);
1254 clear_bit(QPI_FLAG_RECALC_OFFSET, &pool->qpi_flags);
1255 /* Pool can't be changed, since sem has been down.
1256 * Thus until up_read, no one can restart recalc thread. */
1259 up_write(&pool->qpi_recalc_sem);
1261 qpi_putref(&env, pool);
1266 static int qmt_start_pool_recalc(struct lu_env *env, struct qmt_pool_info *qpi)
1268 struct task_struct *task;
1272 if (!test_and_set_bit(QPI_FLAG_RECALC_OFFSET, &qpi->qpi_flags)) {
1273 LASSERT(thread_is_stopped(&qpi->qpi_recalc_thread) ||
1274 thread_is_init(&qpi->qpi_recalc_thread));
1275 OBD_ALLOC(name, QPI_MAXNAME + sizeof("qmt_pool_recalc_"));
1279 snprintf(name, QPI_MAXNAME, "qsd_reint_%s",
1283 thread_set_flags(&qpi->qpi_recalc_thread, SVC_STARTING);
1284 task = kthread_run(qmt_pool_recalc, qpi, name);
1286 thread_set_flags(&qpi->qpi_recalc_thread, SVC_STOPPED);
1287 clear_bit(QPI_FLAG_RECALC_OFFSET, &qpi->qpi_flags);
1289 qpi_putref(env, qpi);
1291 OBD_FREE(name, QPI_MAXNAME + sizeof("qmt_pool_recalc_"));
1297 static inline void qmt_stop_pool_recalc(struct qmt_pool_info *qpi)
1299 struct ptlrpc_thread *thread = &qpi->qpi_recalc_thread;
1301 if (!thread_is_stopped(thread)) {
1302 thread_set_flags(thread, SVC_STOPPING);
1303 wake_up(&thread->t_ctl_waitq);
1305 wait_event_idle(thread->t_ctl_waitq,
1306 thread_is_stopped(thread));
1310 static int qmt_pool_slv_nr_change(const struct lu_env *env,
1311 struct qmt_pool_info *pool,
1314 struct qmt_pool_info *glbl_pool;
1317 glbl_pool = qmt_pool_lookup_glb(env, pool->qpi_qmt, LQUOTA_RES_DT);
1318 if (IS_ERR(glbl_pool))
1319 RETURN(PTR_ERR(glbl_pool));
1321 for (qtype = 0; qtype < LL_MAXQUOTAS; qtype++) {
1322 struct qmt_thread_info *qti = qmt_info(env);
1323 struct dt_object *slv_obj;
1324 struct obd_uuid uuid;
1326 /* We don't need fsname here - anyway
1327 * lquota_disk_slv_filename ignores it. */
1328 snprintf(uuid.uuid, UUID_MAX, "-OST%04x_UUID", idx);
1329 lquota_generate_fid(&qti->qti_fid, pool->qpi_rtype,
1331 /* look-up index file associated with acquiring slave */
1332 slv_obj = lquota_disk_slv_find(env,
1333 glbl_pool->qpi_qmt->qmt_child,
1334 glbl_pool->qpi_root,
1337 if (IS_ERR(slv_obj))
1341 pool->qpi_slv_nr[QMT_STYPE_OST][qtype]++;
1343 pool->qpi_slv_nr[QMT_STYPE_OST][qtype]--;
1344 dt_object_put(env, slv_obj);
1346 qpi_putref(env, glbl_pool);
1351 static int qmt_pool_add_rem(struct obd_device *obd, char *poolname,
1352 char *slavename, bool add)
1354 struct qmt_device *qmt = lu2qmt_dev(obd->obd_lu_dev);
1355 struct qmt_pool_info *qpi;
1360 if (strnlen(poolname, LOV_MAXPOOLNAME + 1) > LOV_MAXPOOLNAME)
1361 RETURN(-ENAMETOOLONG);
1363 CDEBUG(D_QUOTA, add ? "%s: pool %s, adding %s\n" :
1364 "%s: pool %s, removing %s\n",
1365 obd->obd_name, poolname, slavename);
1367 rc = server_name2index(slavename, &idx, NULL);
1368 if (rc != LDD_F_SV_TYPE_OST)
1371 rc = lu_env_init(&env, LCT_MD_THREAD);
1373 CERROR("%s: cannot init env: rc = %d\n", obd->obd_name, rc);
1377 qpi = qmt_pool_lookup_name(&env, qmt, LQUOTA_RES_DT, poolname);
1379 CWARN("%s: can't find pool %s: rc = %long\n",
1380 obd->obd_name, poolname, PTR_ERR(qpi));
1381 GOTO(out, rc = PTR_ERR(qpi));
1384 rc = add ? qmt_sarr_pool_add(qpi, idx, 32) :
1385 qmt_sarr_pool_rem(qpi, idx);
1387 CERROR("%s: can't %s %s pool %s: rc = %d\n",
1388 add ? "add to" : "remove", obd->obd_name,
1389 slavename, poolname, rc);
1390 GOTO(out_putref, rc);
1392 qmt_pool_slv_nr_change(&env, qpi, idx, add);
1393 qmt_start_pool_recalc(&env, qpi);
1396 qpi_putref(&env, qpi);
1405 * Add a single target device to the named pool.
1407 * \param[in] obd OBD device on which to add the pool
1408 * \param[in] poolname name of the pool to which to add the target \a slavename
1409 * \param[in] slavename name of the target device to be added
1411 * \retval 0 if \a slavename was (previously) added to the pool
1412 * \retval negative error number on failure
1414 int qmt_pool_add(struct obd_device *obd, char *poolname, char *slavename)
1416 return qmt_pool_add_rem(obd, poolname, slavename, true);
1420 * Remove the named target from the specified pool.
1422 * \param[in] obd OBD device from which to remove \a poolname
1423 * \param[in] poolname name of the pool to be changed
1424 * \param[in] slavename name of the target to remove from \a poolname
1426 * \retval 0 on successfully removing \a slavename from the pool
1427 * \retval negative number on error (e.g. \a slavename not in pool)
1429 int qmt_pool_rem(struct obd_device *obd, char *poolname, char *slavename)
1431 return qmt_pool_add_rem(obd, poolname, slavename, false);
1435 * Remove the named pool from the QMT device.
1437 * \param[in] obd OBD device on which pool was previously created
1438 * \param[in] poolname name of pool to remove from \a obd
1440 * \retval 0 on successfully removing the pool
1441 * \retval negative error numbers for failures
1443 int qmt_pool_del(struct obd_device *obd, char *poolname)
1445 struct qmt_device *qmt = lu2qmt_dev(obd->obd_lu_dev);
1446 struct qmt_pool_info *qpi;
1448 char buf[LQUOTA_NAME_MAX];
1454 if (strnlen(poolname, LOV_MAXPOOLNAME + 1) > LOV_MAXPOOLNAME)
1455 RETURN(-ENAMETOOLONG);
1457 CDEBUG(D_QUOTA, "Removing quota pool "LOV_POOLNAMEF"\n",
1460 rc = lu_env_init(&env, LCT_MD_THREAD);
1462 CERROR("%s: cannot init env: rc = %d\n", obd->obd_name, rc);
1466 /* look-up pool in charge of this global index FID */
1467 qpi = qmt_pool_lookup_name(&env, qmt, LQUOTA_RES_DT, poolname);
1469 /* Valid case for several MDTs at the same node -
1470 * pool removed by the 1st MDT in config */
1471 CDEBUG(D_QUOTA, "Cannot find pool %s\n", poolname);
1473 RETURN(PTR_ERR(qpi));
1476 for (qtype = 0; qtype < LL_MAXQUOTAS; qtype++) {
1477 lquota_generate_fid(&fid, LQUOTA_RES_DT, qtype);
1478 snprintf(buf, LQUOTA_NAME_MAX, "0x%x", fid.f_oid);
1479 rc = local_object_unlink(&env, qmt->qmt_child,
1480 qpi->qpi_root, buf);
1482 CWARN("%s: cannot unlink %s from pool %s: rc = %d\n",
1483 obd->obd_name, buf, poolname, rc);
1486 /* put ref from look-up */
1487 qpi_putref(&env, qpi);
1488 /* put last ref to free qpi */
1489 qpi_putref(&env, qpi);
1491 snprintf(buf, LQUOTA_NAME_MAX, "%s-%s",
1492 RES_NAME(LQUOTA_RES_DT), poolname);
1493 rc = local_object_unlink(&env, qmt->qmt_child, qmt->qmt_root, buf);
1495 CWARN("%s: cannot unlink dir %s: rc = %d\n",
1496 obd->obd_name, poolname, rc);
1502 static inline int qmt_sarr_pool_init(struct qmt_pool_info *qpi)
1505 /* No need to initialize sarray for global pool
1506 * as it always includes all slaves */
1507 if (qmt_pool_global(qpi))
1510 switch (qpi->qpi_rtype) {
1512 return tgt_pool_init(&qpi->qpi_sarr.osts, 0);
1519 static inline int qmt_sarr_pool_add(struct qmt_pool_info *qpi, int idx, int min)
1521 switch (qpi->qpi_rtype) {
1523 return tgt_pool_add(&qpi->qpi_sarr.osts, idx, min);
1530 static inline int qmt_sarr_pool_rem(struct qmt_pool_info *qpi, int idx)
1532 switch (qpi->qpi_rtype) {
1534 return tgt_pool_remove(&qpi->qpi_sarr.osts, idx);
1541 static inline int qmt_sarr_pool_free(struct qmt_pool_info *qpi)
1543 if (qmt_pool_global(qpi))
1546 switch (qpi->qpi_rtype) {
1548 if (!qpi->qpi_sarr.osts.op_array)
1550 return tgt_pool_free(&qpi->qpi_sarr.osts);
1557 static inline int qmt_sarr_check_idx(struct qmt_pool_info *qpi, int idx)
1559 if (qmt_pool_global(qpi))
1562 switch (qpi->qpi_rtype) {
1564 return tgt_check_index(idx, &qpi->qpi_sarr.osts);
1571 inline struct rw_semaphore *qmt_sarr_rwsem(struct qmt_pool_info *qpi)
1573 switch (qpi->qpi_rtype) {
1575 /* to protect ost_pool use */
1576 return &qpi->qpi_sarr.osts.op_rw_sem;
1583 inline int qmt_sarr_get_idx(struct qmt_pool_info *qpi, int arr_idx)
1586 if (qmt_pool_global(qpi))
1589 switch (qpi->qpi_rtype) {
1591 LASSERTF(arr_idx < qpi->qpi_sarr.osts.op_count && arr_idx >= 0,
1592 "idx invalid %d op_count %d\n", arr_idx,
1593 qpi->qpi_sarr.osts.op_count);
1594 return qpi->qpi_sarr.osts.op_array[arr_idx];
1601 /* Number of slaves in a pool */
1602 inline unsigned int qmt_sarr_count(struct qmt_pool_info *qpi)
1604 switch (qpi->qpi_rtype) {
1606 return qpi->qpi_sarr.osts.op_count;