4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; if not, write to the
18 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19 * Boston, MA 021110-1307, USA
24 * Copyright (c) 2012, 2013, Intel Corporation.
25 * Use is subject to license terms.
27 * Author: Johann Lombardi <johann.lombardi@intel.com>
28 * Author: Niu Yawei <yawei.niu@intel.com>
32 * A Quota Master Target has a hash table where it stores qmt_pool_info
33 * structures. There is one such structure for each pool managed by the QMT.
35 * Each pool can have different quota types enforced (typically user & group
36 * quota). A pool is in charge of managing lquota_entry structures for each
37 * quota type. This is done by creating one lquota_entry site per quota
38 * type. A site stores entries in a hash table and read quota settings from disk
39 * when a given ID isn't present in the hash.
41 * The pool API exported here is the following:
42 * - qmt_pool_init(): initializes the general QMT structures used to manage
44 * - qmt_pool_fini(): frees the structures allocated by qmt_pool_fini().
45 * - qmt_pool_prepare(): sets up the on-disk indexes associated with each pool.
46 * - qmt_pool_new_conn(): is used to create a new slave index file.
47 * - qmt_pool_lqe_lookup(): returns an up-to-date lquota entry associated with
51 #define DEBUG_SUBSYSTEM S_LQUOTA
53 #include <obd_class.h>
54 #include <lprocfs_status.h>
55 #include "qmt_internal.h"
57 static void qmt_pool_free(const struct lu_env *, struct qmt_pool_info *);
60 * Static helper functions not used outside the scope of this file
64 * Reference counter management for qmt_pool_info structures
66 static inline void qpi_getref(struct qmt_pool_info *pool)
68 atomic_inc(&pool->qpi_ref);
71 static inline void qpi_putref(const struct lu_env *env,
72 struct qmt_pool_info *pool)
74 LASSERT(atomic_read(&pool->qpi_ref) > 0);
75 if (atomic_dec_and_test(&pool->qpi_ref))
76 qmt_pool_free(env, pool);
79 static inline void qpi_putref_locked(struct qmt_pool_info *pool)
81 LASSERT(atomic_read(&pool->qpi_ref) > 1);
82 atomic_dec(&pool->qpi_ref);
86 * Hash functions for qmt_pool_info management
90 qpi_hash_hash(struct cfs_hash *hs, const void *key, unsigned mask)
92 return cfs_hash_u32_hash(*((__u32 *)key), mask);
95 static void *qpi_hash_key(struct hlist_node *hnode)
97 struct qmt_pool_info *pool;
98 pool = hlist_entry(hnode, struct qmt_pool_info, qpi_hash);
99 return &pool->qpi_key;
102 static int qpi_hash_keycmp(const void *key, struct hlist_node *hnode)
104 struct qmt_pool_info *pool;
105 pool = hlist_entry(hnode, struct qmt_pool_info, qpi_hash);
106 return pool->qpi_key == *((__u32 *)key);
109 static void *qpi_hash_object(struct hlist_node *hnode)
111 return hlist_entry(hnode, struct qmt_pool_info, qpi_hash);
114 static void qpi_hash_get(struct cfs_hash *hs, struct hlist_node *hnode)
116 struct qmt_pool_info *pool;
117 pool = hlist_entry(hnode, struct qmt_pool_info, qpi_hash);
121 static void qpi_hash_put_locked(struct cfs_hash *hs, struct hlist_node *hnode)
123 struct qmt_pool_info *pool;
124 pool = hlist_entry(hnode, struct qmt_pool_info, qpi_hash);
125 qpi_putref_locked(pool);
128 static void qpi_hash_exit(struct cfs_hash *hs, struct hlist_node *hnode)
130 CERROR("Should not have any item left!\n");
133 /* vector of hash operations */
134 static struct cfs_hash_ops qpi_hash_ops = {
135 .hs_hash = qpi_hash_hash,
136 .hs_key = qpi_hash_key,
137 .hs_keycmp = qpi_hash_keycmp,
138 .hs_object = qpi_hash_object,
139 .hs_get = qpi_hash_get,
140 .hs_put_locked = qpi_hash_put_locked,
141 .hs_exit = qpi_hash_exit
144 /* some procfs helpers */
145 static int qpi_state_seq_show(struct seq_file *m, void *data)
147 struct qmt_pool_info *pool = m->private;
150 LASSERT(pool != NULL);
152 seq_printf(m, "pool:\n"
156 " least qunit: %lu\n",
157 pool->qpi_key & 0x0000ffff,
158 RES_NAME(pool->qpi_key >> 16),
159 atomic_read(&pool->qpi_ref),
160 pool->qpi_least_qunit);
162 for (type = 0; type < LL_MAXQUOTAS; type++)
163 seq_printf(m, " %s:\n"
167 pool->qpi_slv_nr[type],
168 atomic_read(&pool->qpi_site[type]->lqs_hash->hs_count));
172 LPROC_SEQ_FOPS_RO(qpi_state);
174 static int qpi_soft_least_qunit_seq_show(struct seq_file *m, void *data)
176 struct qmt_pool_info *pool = m->private;
177 LASSERT(pool != NULL);
179 return seq_printf(m, "%lu\n", pool->qpi_soft_least_qunit);
183 qpi_soft_least_qunit_seq_write(struct file *file, const char __user *buffer,
184 size_t count, loff_t *off)
186 struct qmt_pool_info *pool;
190 pool = ((struct seq_file *)file->private_data)->private;
191 LASSERT(pool != NULL);
193 /* Not tuneable for inode limit */
194 if (pool->qpi_key >> 16 != LQUOTA_RES_DT)
197 rc = lprocfs_str_to_s64(buffer, count, &least_qunit);
201 /* Miminal qpi_soft_least_qunit */
202 qunit = pool->qpi_least_qunit << 2;
203 /* The value must be power of miminal qpi_soft_least_qunit, see
204 * how the qunit is adjusted in qmt_adjust_qunit(). */
205 while (qunit > 0 && qunit < least_qunit)
208 qunit = INT_MAX & ~3;
210 pool->qpi_soft_least_qunit = qunit;
213 LPROC_SEQ_FOPS(qpi_soft_least_qunit);
215 static struct lprocfs_vars lprocfs_quota_qpi_vars[] = {
217 .fops = &qpi_state_fops },
218 { .name = "soft_least_qunit",
219 .fops = &qpi_soft_least_qunit_fops },
224 * Allocate a new qmt_pool_info structure and add it to the pool hash table
227 * \param env - is the environment passed by the caller
228 * \param qmt - is the quota master target
229 * \param pool_id - is the 16-bit pool identifier of the new pool to add
230 * \param pool_type - is the resource type of this pool instance, either
231 * LQUOTA_RES_MD or LQUOTA_RES_DT.
233 * \retval - 0 on success, appropriate error on failure
235 static int qmt_pool_alloc(const struct lu_env *env, struct qmt_device *qmt,
236 int pool_id, int pool_type)
238 struct qmt_thread_info *qti = qmt_info(env);
239 struct qmt_pool_info *pool;
246 INIT_LIST_HEAD(&pool->qpi_linkage);
248 /* assign key used by hash functions */
249 pool->qpi_key = pool_id + (pool_type << 16);
251 /* initialize refcount to 1, hash table will then grab an additional
253 atomic_set(&pool->qpi_ref, 1);
255 /* set up least qunit size to use for this pool */
256 pool->qpi_least_qunit = LQUOTA_LEAST_QUNIT(pool_type);
257 if (pool_type == LQUOTA_RES_DT)
258 pool->qpi_soft_least_qunit = pool->qpi_least_qunit << 2;
260 pool->qpi_soft_least_qunit = pool->qpi_least_qunit;
262 /* create pool proc directory */
263 sprintf(qti->qti_buf, "%s-0x%x", RES_NAME(pool_type), pool_id);
264 pool->qpi_proc = lprocfs_register(qti->qti_buf, qmt->qmt_proc,
265 lprocfs_quota_qpi_vars, pool);
266 if (IS_ERR(pool->qpi_proc)) {
267 rc = PTR_ERR(pool->qpi_proc);
268 CERROR("%s: failed to create proc entry for pool %s (%d)\n",
269 qmt->qmt_svname, qti->qti_buf, rc);
270 pool->qpi_proc = NULL;
274 /* grab reference on master target that this pool belongs to */
275 lu_device_get(qmt2lu_dev(qmt));
276 lu_ref_add(&qmt2lu_dev(qmt)->ld_reference, "pool", pool);
279 /* add to qmt hash table */
280 rc = cfs_hash_add_unique(qmt->qmt_pool_hash, &pool->qpi_key,
283 CERROR("%s: failed to add pool %s to qmt hash (%d)\n",
284 qmt->qmt_svname, qti->qti_buf, rc);
288 /* add to qmt pool list */
289 list_add_tail(&pool->qpi_linkage, &qmt->qmt_pool_list);
293 /* this frees the pool structure since refcount is equal to 1 */
294 qpi_putref(env, pool);
299 * Delete a qmt_pool_info instance and all structures associated.
301 * \param env - is the environment passed by the caller
302 * \param pool - is the qmt_pool_info structure to free
304 static void qmt_pool_free(const struct lu_env *env, struct qmt_pool_info *pool)
309 /* release proc entry */
310 if (pool->qpi_proc) {
311 lprocfs_remove(&pool->qpi_proc);
312 pool->qpi_proc = NULL;
315 /* release per-quota type site used to manage quota entries as well as
316 * references to global index files */
317 for (qtype = 0; qtype < LL_MAXQUOTAS; qtype++) {
318 /* release lqe storing grace time */
319 if (pool->qpi_grace_lqe[qtype] != NULL)
320 lqe_putref(pool->qpi_grace_lqe[qtype]);
323 if (pool->qpi_site[qtype] != NULL &&
324 !IS_ERR(pool->qpi_site[qtype]))
325 lquota_site_free(env, pool->qpi_site[qtype]);
326 /* release reference to global index */
327 if (pool->qpi_glb_obj[qtype] != NULL &&
328 !IS_ERR(pool->qpi_glb_obj[qtype]))
329 lu_object_put(env, &pool->qpi_glb_obj[qtype]->do_lu);
332 /* release reference on pool directory */
333 if (pool->qpi_root != NULL && !IS_ERR(pool->qpi_root))
334 lu_object_put(env, &pool->qpi_root->do_lu);
336 /* release reference on the master target */
337 if (pool->qpi_qmt != NULL) {
338 struct lu_device *ld = qmt2lu_dev(pool->qpi_qmt);
341 lu_ref_del(&ld->ld_reference, "pool", pool);
342 pool->qpi_qmt = NULL;
345 LASSERT(list_empty(&pool->qpi_linkage));
350 * Look-up a pool in the hash table based on the pool ID and type.
352 * \param env - is the environment passed by the caller
353 * \param qmt - is the quota master target
354 * \param pool_id - is the 16-bit identifier of the pool to look up
355 * \param pool_type - is the type of this pool, either LQUOTA_RES_MD or
358 static struct qmt_pool_info *qmt_pool_lookup(const struct lu_env *env,
359 struct qmt_device *qmt,
360 int pool_id, int pool_type)
362 struct qmt_pool_info *pool;
366 LASSERT(qmt->qmt_pool_hash != NULL);
368 /* look-up pool in hash table */
369 key = pool_id + (pool_type << 16);
370 pool = cfs_hash_lookup(qmt->qmt_pool_hash, (void *)&key);
372 /* this qmt isn't managing this pool! */
373 CERROR("%s: looking up quota entry for a pool (0x%x/%d) which "
374 "isn't managed by this quota master target\n",
375 qmt->qmt_svname, pool_id, pool_type);
376 RETURN(ERR_PTR(-ENOENT));
382 * Functions implementing the pool API, used by the qmt handlers
386 * Destroy all pools which are still in the hash table and free the pool
389 * \param env - is the environment passed by the caller
390 * \param qmt - is the quota master target
393 void qmt_pool_fini(const struct lu_env *env, struct qmt_device *qmt)
395 struct qmt_pool_info *pool;
396 struct list_head *pos, *n;
399 if (qmt->qmt_pool_hash == NULL)
402 /* parse list of pool and destroy each element */
403 list_for_each_safe(pos, n, &qmt->qmt_pool_list) {
404 pool = list_entry(pos, struct qmt_pool_info,
406 /* remove from hash */
407 cfs_hash_del(qmt->qmt_pool_hash, &pool->qpi_key,
410 /* remove from list */
411 list_del_init(&pool->qpi_linkage);
413 /* release extra reference taken in qmt_pool_alloc */
414 qpi_putref(env, pool);
416 LASSERT(list_empty(&qmt->qmt_pool_list));
418 cfs_hash_putref(qmt->qmt_pool_hash);
419 qmt->qmt_pool_hash = NULL;
424 * Initialize pool configure for the quota master target. For now, we only
425 * support the default data (i.e. all OSTs) and metadata (i.e. all the MDTs)
426 * pool which are instantiated in this function.
428 * \param env - is the environment passed by the caller
429 * \param qmt - is the quota master target for which we have to initialize the
432 * \retval - 0 on success, appropriate error on failure
434 int qmt_pool_init(const struct lu_env *env, struct qmt_device *qmt)
439 /* initialize pool hash table */
440 qmt->qmt_pool_hash = cfs_hash_create("POOL_HASH",
443 HASH_POOLS_BKT_BITS, 0,
448 if (qmt->qmt_pool_hash == NULL) {
449 CERROR("%s: failed to create pool hash table\n",
454 /* initialize pool list */
455 INIT_LIST_HEAD(&qmt->qmt_pool_list);
457 /* Instantiate pool master for the default data and metadata pool (both
458 * have pool ID equals to 0).
459 * This code will have to be revisited once we support quota on
460 * non-default pools */
461 for (res = LQUOTA_FIRST_RES; res < LQUOTA_LAST_RES; res++) {
462 rc = qmt_pool_alloc(env, qmt, 0, res);
468 qmt_pool_fini(env, qmt);
473 static int qmt_slv_cnt(const struct lu_env *env, struct lu_fid *glb_fid,
474 char *slv_name, struct lu_fid *slv_fid, void *arg)
485 * Set up on-disk index files associated with each pool.
487 * \param env - is the environment passed by the caller
488 * \param qmt - is the quota master target for which we have to initialize the
490 * \param qmt_root - is the on-disk directory created for the QMT.
492 * \retval - 0 on success, appropriate error on failure
494 int qmt_pool_prepare(const struct lu_env *env, struct qmt_device *qmt,
495 struct dt_object *qmt_root)
497 struct qmt_thread_info *qti = qmt_info(env);
498 struct lquota_glb_rec *rec = &qti->qti_glb_rec;
499 struct qmt_pool_info *pool;
500 struct dt_device *dev = NULL;
501 dt_obj_version_t version;
502 struct list_head *pos;
506 LASSERT(qmt->qmt_pool_hash != NULL);
508 /* iterate over each pool in the hash and allocate a quota site for each
509 * one. This involves creating a global index file on disk */
510 list_for_each(pos, &qmt->qmt_pool_list) {
511 struct dt_object *obj;
512 int pool_type, pool_id;
513 struct lquota_entry *lqe;
515 pool = list_entry(pos, struct qmt_pool_info,
518 pool_id = pool->qpi_key & 0x0000ffff;
519 pool_type = pool->qpi_key >> 16;
521 dev = pool->qpi_qmt->qmt_child;
523 /* allocate directory for this pool */
524 sprintf(qti->qti_buf, "%s-0x%x", RES_NAME(pool_type), pool_id);
525 obj = lquota_disk_dir_find_create(env, qmt->qmt_child, qmt_root,
528 RETURN(PTR_ERR(obj));
529 pool->qpi_root = obj;
531 for (qtype = 0; qtype < LL_MAXQUOTAS; qtype++) {
532 /* Generating FID of global index in charge of storing
533 * settings for this quota type */
534 lquota_generate_fid(&qti->qti_fid, pool_id, pool_type,
537 /* open/create the global index file for this quota
539 obj = lquota_disk_glb_find_create(env, dev,
541 &qti->qti_fid, false);
544 CERROR("%s: failed to create glb index copy for"
545 " %s type (%d)\n", qmt->qmt_svname,
546 QTYPE_NAME(qtype), rc);
550 pool->qpi_glb_obj[qtype] = obj;
552 version = dt_version_get(env, obj);
553 /* set default grace time for newly created index */
555 rec->qbr_hardlimit = 0;
556 rec->qbr_softlimit = 0;
557 rec->qbr_granted = 0;
558 rec->qbr_time = pool_type == LQUOTA_RES_MD ?
559 MAX_IQ_TIME : MAX_DQ_TIME;
561 rc = lquota_disk_write_glb(env, obj, 0, rec);
563 CERROR("%s: failed to set default "
564 "grace time for %s type (%d)\n",
566 QTYPE_NAME(qtype), rc);
570 rc = lquota_disk_update_ver(env, dev, obj, 1);
572 CERROR("%s: failed to set initial "
573 "version for %s type (%d)\n",
575 QTYPE_NAME(qtype), rc);
580 /* create quota entry site for this quota type */
581 pool->qpi_site[qtype] = lquota_site_alloc(env, pool,
584 if (IS_ERR(pool->qpi_site[qtype])) {
585 rc = PTR_ERR(pool->qpi_site[qtype]);
586 CERROR("%s: failed to create site for %s type "
587 "(%d)\n", qmt->qmt_svname,
588 QTYPE_NAME(qtype), rc);
592 /* count number of slaves which already connected to
593 * the master in the past */
594 pool->qpi_slv_nr[qtype] = 0;
595 rc = lquota_disk_for_each_slv(env, pool->qpi_root,
598 &pool->qpi_slv_nr[qtype]);
600 CERROR("%s: failed to scan & count slave "
601 "indexes for %s type (%d)\n",
602 qmt->qmt_svname, QTYPE_NAME(qtype), rc);
606 /* Global grace time is stored in quota settings of
608 qti->qti_id.qid_uid = 0;
610 /* look-up quota entry storing grace time */
611 lqe = lqe_locate(env, pool->qpi_site[qtype],
614 RETURN(PTR_ERR(lqe));
615 pool->qpi_grace_lqe[qtype] = lqe;
616 #ifdef CONFIG_PROC_FS
617 /* add procfs file to dump the global index, mostly for
618 * debugging purpose */
619 sprintf(qti->qti_buf, "glb-%s", QTYPE_NAME(qtype));
620 rc = lprocfs_seq_create(pool->qpi_proc, qti->qti_buf,
621 0444, &lprocfs_quota_seq_fops,
624 CWARN("%s: Error adding procfs file for global"
625 "quota index "DFID", rc:%d\n",
626 qmt->qmt_svname, PFID(&qti->qti_fid), rc);
635 * Handle new slave connection. Called when a slave enqueues the global quota
636 * lock at the beginning of the reintegration procedure.
638 * \param env - is the environment passed by the caller
639 * \parap qmt - is the quota master target handling this request
640 * \param glb_fid - is the fid of the global index file
641 * \param slv_fid - is the fid of the newly created slave index file
642 * \param slv_ver - is the current version of the slave index file
643 * \param uuid - is the uuid of slave which is (re)connecting to the master
646 * \retval - 0 on success, appropriate error on failure
648 int qmt_pool_new_conn(const struct lu_env *env, struct qmt_device *qmt,
649 struct lu_fid *glb_fid, struct lu_fid *slv_fid,
650 __u64 *slv_ver, struct obd_uuid *uuid)
652 struct qmt_pool_info *pool;
653 struct dt_object *slv_obj;
654 int pool_id, pool_type, qtype;
655 bool created = false;
658 /* extract pool info from global index FID */
659 rc = lquota_extract_fid(glb_fid, &pool_id, &pool_type, &qtype);
663 /* look-up pool in charge of this global index FID */
664 pool = qmt_pool_lookup(env, qmt, pool_id, pool_type);
666 RETURN(PTR_ERR(pool));
668 /* look-up slave index file */
669 slv_obj = lquota_disk_slv_find(env, qmt->qmt_child, pool->qpi_root,
671 if (IS_ERR(slv_obj) && PTR_ERR(slv_obj) == -ENOENT) {
672 /* create slave index file */
673 slv_obj = lquota_disk_slv_find_create(env, qmt->qmt_child,
674 pool->qpi_root, glb_fid,
678 if (IS_ERR(slv_obj)) {
679 rc = PTR_ERR(slv_obj);
680 CERROR("%s: failed to create quota slave index file for %s (%d)"
681 "\n", qmt->qmt_svname, obd_uuid2str(uuid), rc);
685 /* retrieve slave fid & current object version */
686 memcpy(slv_fid, lu_object_fid(&slv_obj->do_lu), sizeof(*slv_fid));
687 *slv_ver = dt_version_get(env, slv_obj);
688 lu_object_put(env, &slv_obj->do_lu);
690 pool->qpi_slv_nr[qtype]++;
692 qpi_putref(env, pool);
697 * Look-up a lquota_entry in the pool hash and allocate it if not found.
699 * \param env - is the environment passed by the caller
700 * \param qmt - is the quota master target for which we have to initialize the
702 * \param pool_id - is the 16-bit identifier of the pool
703 * \param pool_type - is the pool type, either LQUOTA_RES_MD or LQUOTA_RES_DT.
704 * \param qtype - is the quota type, either user or group.
705 * \param qid - is the quota ID to look-up
707 * \retval - valid pointer to lquota entry on success, appropriate error on
710 struct lquota_entry *qmt_pool_lqe_lookup(const struct lu_env *env,
711 struct qmt_device *qmt,
712 int pool_id, int pool_type,
713 int qtype, union lquota_id *qid)
715 struct qmt_pool_info *pool;
716 struct lquota_entry *lqe;
719 /* look-up pool responsible for this global index FID */
720 pool = qmt_pool_lookup(env, qmt, pool_id, pool_type);
722 RETURN((void *)pool);
724 if (qid->qid_uid == 0) {
725 /* caller wants to access grace time, no need to look up the
726 * entry since we keep a reference on ID 0 all the time */
727 lqe = pool->qpi_grace_lqe[qtype];
732 /* now that we have the pool, let's look-up the quota entry in the
733 * right quota site */
734 lqe = lqe_locate(env, pool->qpi_site[qtype], qid);
736 qpi_putref(env, pool);