4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; if not, write to the
18 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19 * Boston, MA 021110-1307, USA
24 * Copyright (c) 2012 Intel, Inc.
25 * Use is subject to license terms.
27 * Author: Johann Lombardi <johann.lombardi@intel.com>
28 * Author: Niu Yawei <yawei.niu@intel.com>
32 * A Quota Master Target has a hash table where it stores qmt_pool_info
33 * structures. There is one such structure for each pool managed by the QMT.
35 * Each pool can have different quota types enforced (typically user & group
36 * quota). A pool is in charge of managing lquota_entry structures for each
37 * quota type. This is done by creating one lquota_entry site per quota
38 * type. A site stores entries in a hash table and read quota settings from disk
39 * when a given ID isn't present in the hash.
41 * The pool API exported here is the following:
42 * - qmt_pool_init(): initializes the general QMT structures used to manage
44 * - qmt_pool_fini(): frees the structures allocated by qmt_pool_fini().
45 * - qmt_pool_prepare(): sets up the on-disk indexes associated with each pool.
46 * - qmt_pool_new_conn(): is used to create a new slave index file.
47 * - qmt_pool_lqe_lookup(): returns an up-to-date lquota entry associated with
52 # define EXPORT_SYMTAB
55 #define DEBUG_SUBSYSTEM S_LQUOTA
57 #include <obd_class.h>
58 #include <lprocfs_status.h>
59 #include "qmt_internal.h"
61 static void qmt_pool_free(const struct lu_env *, struct qmt_pool_info *);
64 * Static helper functions not used outside the scope of this file
68 * Reference counter management for qmt_pool_info structures
70 static inline void qpi_getref(struct qmt_pool_info *pool)
72 cfs_atomic_inc(&pool->qpi_ref);
75 static inline void qpi_putref(const struct lu_env *env,
76 struct qmt_pool_info *pool)
78 LASSERT(atomic_read(&pool->qpi_ref) > 0);
79 if (cfs_atomic_dec_and_test(&pool->qpi_ref))
80 qmt_pool_free(env, pool);
83 static inline void qpi_putref_locked(struct qmt_pool_info *pool)
85 LASSERT(cfs_atomic_read(&pool->qpi_ref) > 1);
86 cfs_atomic_dec(&pool->qpi_ref);
90 * Hash functions for qmt_pool_info management
93 static unsigned qpi_hash_hash(cfs_hash_t *hs, const void *key, unsigned mask)
95 return cfs_hash_u32_hash(*((__u32 *)key), mask);
98 static void *qpi_hash_key(cfs_hlist_node_t *hnode)
100 struct qmt_pool_info *pool;
101 pool = cfs_hlist_entry(hnode, struct qmt_pool_info, qpi_hash);
102 return &pool->qpi_key;
105 static int qpi_hash_keycmp(const void *key, cfs_hlist_node_t *hnode)
107 struct qmt_pool_info *pool;
108 pool = cfs_hlist_entry(hnode, struct qmt_pool_info, qpi_hash);
109 return pool->qpi_key == *((__u32 *)key);
112 static void *qpi_hash_object(cfs_hlist_node_t *hnode)
114 return cfs_hlist_entry(hnode, struct qmt_pool_info, qpi_hash);
117 static void qpi_hash_get(cfs_hash_t *hs, cfs_hlist_node_t *hnode)
119 struct qmt_pool_info *pool;
120 pool = cfs_hlist_entry(hnode, struct qmt_pool_info, qpi_hash);
124 static void qpi_hash_put_locked(cfs_hash_t *hs, cfs_hlist_node_t *hnode)
126 struct qmt_pool_info *pool;
127 pool = cfs_hlist_entry(hnode, struct qmt_pool_info, qpi_hash);
128 qpi_putref_locked(pool);
131 static void qpi_hash_exit(cfs_hash_t *hs, cfs_hlist_node_t *hnode)
133 CERROR("Should not have any item left!\n");
136 /* vector of hash operations */
137 static cfs_hash_ops_t qpi_hash_ops = {
138 .hs_hash = qpi_hash_hash,
139 .hs_key = qpi_hash_key,
140 .hs_keycmp = qpi_hash_keycmp,
141 .hs_object = qpi_hash_object,
142 .hs_get = qpi_hash_get,
143 .hs_put_locked = qpi_hash_put_locked,
144 .hs_exit = qpi_hash_exit
147 /* some procfs helpers */
148 static int lprocfs_qpi_rd_state(char *page, char **start, off_t off,
149 int count, int *eof, void *data)
151 struct qmt_pool_info *pool = (struct qmt_pool_info *)data;
154 LASSERT(pool != NULL);
156 i = snprintf(page, count,
161 " least qunit: %lu\n",
162 pool->qpi_key & 0x0000ffff,
163 RES_NAME(pool->qpi_key >> 16),
164 cfs_atomic_read(&pool->qpi_ref),
165 pool->qpi_least_qunit);
168 for (type = 0; type < MAXQUOTAS; type++)
169 i += snprintf(page + i, count - i,
174 pool->qpi_slv_nr[type],
175 cfs_atomic_read(&pool->qpi_site[type]->lqs_hash->hs_count));
180 static struct lprocfs_vars lprocfs_quota_qpi_vars[] = {
181 { "info", lprocfs_qpi_rd_state, 0, 0},
186 * Allocate a new qmt_pool_info structure and add it to the pool hash table
189 * \param env - is the environment passed by the caller
190 * \param qmt - is the quota master target
191 * \param pool_id - is the 16-bit pool identifier of the new pool to add
192 * \param pool_type - is the resource type of this pool instance, either
193 * LQUOTA_RES_MD or LQUOTA_RES_DT.
195 * \retval - 0 on success, appropriate error on failure
197 static int qmt_pool_alloc(const struct lu_env *env, struct qmt_device *qmt,
198 int pool_id, int pool_type)
200 struct qmt_thread_info *qti = qmt_info(env);
201 struct qmt_pool_info *pool;
208 CFS_INIT_LIST_HEAD(&pool->qpi_linkage);
210 /* assign key used by hash functions */
211 pool->qpi_key = pool_id + (pool_type << 16);
213 /* initialize refcount to 1, hash table will then grab an additional
215 atomic_set(&pool->qpi_ref, 1);
217 /* set up least qunit size to use for this pool */
218 pool->qpi_least_qunit = LQUOTA_LEAST_QUNIT(pool_type);
220 /* create pool proc directory */
221 sprintf(qti->qti_buf, "%s-0x%x", RES_NAME(pool_type), pool_id);
222 pool->qpi_proc = lprocfs_register(qti->qti_buf, qmt->qmt_proc,
223 lprocfs_quota_qpi_vars, pool);
224 if (IS_ERR(pool->qpi_proc)) {
225 rc = PTR_ERR(pool->qpi_proc);
226 CERROR("%s: failed to create proc entry for pool %s (%d)\n",
227 qmt->qmt_svname, qti->qti_buf, rc);
228 pool->qpi_proc = NULL;
232 /* grab reference on master target that this pool belongs to */
233 lu_device_get(qmt2lu_dev(qmt));
234 lu_ref_add(&qmt2lu_dev(qmt)->ld_reference, "pool", pool);
237 /* add to qmt hash table */
238 rc = cfs_hash_add_unique(qmt->qmt_pool_hash, &pool->qpi_key,
241 CERROR("%s: failed to add pool %s to qmt hash (%d)\n",
242 qmt->qmt_svname, qti->qti_buf, rc);
246 /* add to qmt pool list */
247 cfs_list_add_tail(&pool->qpi_linkage, &qmt->qmt_pool_list);
251 /* this frees the pool structure since refcount is equal to 1 */
252 qpi_putref(env, pool);
257 * Delete a qmt_pool_info instance and all structures associated.
259 * \param env - is the environment passed by the caller
260 * \param pool - is the qmt_pool_info structure to free
262 static void qmt_pool_free(const struct lu_env *env, struct qmt_pool_info *pool)
267 /* release proc entry */
268 if (pool->qpi_proc) {
269 lprocfs_remove(&pool->qpi_proc);
270 pool->qpi_proc = NULL;
273 /* release per-quota type site used to manage quota entries as well as
274 * references to global index files */
275 for (qtype = 0; qtype < MAXQUOTAS; qtype++) {
277 if (pool->qpi_site[qtype] != NULL &&
278 !IS_ERR(pool->qpi_site[qtype]))
279 lquota_site_free(env, pool->qpi_site[qtype]);
280 /* release reference to global index */
281 if (pool->qpi_glb_obj[qtype] != NULL &&
282 !IS_ERR(pool->qpi_glb_obj[qtype]))
283 lu_object_put(env, &pool->qpi_glb_obj[qtype]->do_lu);
286 /* release reference on pool directory */
287 if (pool->qpi_root != NULL && !IS_ERR(pool->qpi_root))
288 lu_object_put(env, &pool->qpi_root->do_lu);
290 /* release reference on the master target */
291 if (pool->qpi_qmt != NULL) {
292 struct lu_device *ld = qmt2lu_dev(pool->qpi_qmt);
295 lu_ref_del(&ld->ld_reference, "pool", pool);
296 pool->qpi_qmt = NULL;
299 LASSERT(cfs_list_empty(&pool->qpi_linkage));
304 * Look-up a pool in the hash table based on the pool ID and type.
306 * \param env - is the environment passed by the caller
307 * \param qmt - is the quota master target
308 * \param pool_id - is the 16-bit identifier of the pool to look up
309 * \param pool_type - is the type of this pool, either LQUOTA_RES_MD or
312 static struct qmt_pool_info *qmt_pool_lookup(const struct lu_env *env,
313 struct qmt_device *qmt,
314 int pool_id, int pool_type)
316 struct qmt_pool_info *pool;
320 LASSERT(qmt->qmt_pool_hash != NULL);
322 /* look-up pool in hash table */
323 key = pool_id + (pool_type << 16);
324 pool = cfs_hash_lookup(qmt->qmt_pool_hash, (void *)&key);
326 /* this qmt isn't managing this pool! */
327 CERROR("%s: looking up quota entry for a pool (0x%x/%d) which "
328 "isn't managed by this quota master target\n",
329 qmt->qmt_svname, pool_id, pool_type);
330 RETURN(ERR_PTR(-ENOENT));
336 * Functions implementing the pool API, used by the qmt handlers
340 * Destroy all pools which are still in the hash table and free the pool
343 * \param env - is the environment passed by the caller
344 * \param qmt - is the quota master target
347 void qmt_pool_fini(const struct lu_env *env, struct qmt_device *qmt)
349 struct qmt_pool_info *pool;
353 if (qmt->qmt_pool_hash == NULL)
356 /* parse list of pool and destroy each element */
357 cfs_list_for_each_safe(pos, n, &qmt->qmt_pool_list) {
358 pool = cfs_list_entry(pos, struct qmt_pool_info,
360 /* remove from hash */
361 cfs_hash_del(qmt->qmt_pool_hash, &pool->qpi_key,
364 /* remove from list */
365 cfs_list_del_init(&pool->qpi_linkage);
367 /* release extra reference taken in qmt_pool_alloc */
368 qpi_putref(env, pool);
370 LASSERT(cfs_list_empty(&qmt->qmt_pool_list));
372 cfs_hash_putref(qmt->qmt_pool_hash);
373 qmt->qmt_pool_hash = NULL;
378 * Initialize pool configure for the quota master target. For now, we only
379 * support the default data (i.e. all OSTs) and metadata (i.e. all the MDTs)
380 * pool which are instantiated in this function.
382 * \param env - is the environment passed by the caller
383 * \param qmt - is the quota master target for which we have to initializa the
386 * \retval - 0 on success, appropriate error on failure
388 int qmt_pool_init(const struct lu_env *env, struct qmt_device *qmt)
393 /* initialize pool hash table */
394 qmt->qmt_pool_hash = cfs_hash_create("POOL_HASH",
397 HASH_POOLS_BKT_BITS, 0,
402 if (qmt->qmt_pool_hash == NULL) {
403 CERROR("%s: failed to create pool hash table\n",
408 /* initialize pool list */
409 CFS_INIT_LIST_HEAD(&qmt->qmt_pool_list);
411 /* Instantiate pool master for the default data and metadata pool (both
412 * have pool ID equals to 0).
413 * This code will have to be revisited once we support quota on
414 * non-default pools */
415 for (res = LQUOTA_FIRST_RES; res < LQUOTA_LAST_RES; res++) {
416 rc = qmt_pool_alloc(env, qmt, 0, res);
422 qmt_pool_fini(env, qmt);
427 static int qmt_slv_cnt(const struct lu_env *env, struct lu_fid *glb_fid,
428 char *slv_name, struct lu_fid *slv_fid, void *arg)
439 * Set up on-disk index files associated with each pool.
441 * \param env - is the environment passed by the caller
442 * \param qmt - is the quota master target for which we have to initializa the
444 * \param qmt_root - is the on-disk directory created for the QMT.
446 * \retval - 0 on success, appropriate error on failure
448 int qmt_pool_prepare(const struct lu_env *env, struct qmt_device *qmt,
449 struct dt_object *qmt_root)
451 struct qmt_thread_info *qti = qmt_info(env);
452 struct qmt_pool_info *pool;
453 struct dt_device *dev = NULL;
458 LASSERT(qmt->qmt_pool_hash != NULL);
460 /* iterate over each pool in the hash and allocate a quota site for each
461 * one. This involves creating a global index file on disk */
462 cfs_list_for_each(pos, &qmt->qmt_pool_list) {
463 struct dt_object *obj;
464 int pool_type, pool_id;
466 pool = cfs_list_entry(pos, struct qmt_pool_info,
469 pool_id = pool->qpi_key & 0x0000ffff;
470 pool_type = pool->qpi_key >> 16;
472 dev = pool->qpi_qmt->qmt_child;
474 /* allocate directory for this pool */
475 sprintf(qti->qti_buf, "%s-0x%x", RES_NAME(pool_type), pool_id);
476 obj = lquota_disk_dir_find_create(env, qmt->qmt_child, qmt_root,
479 RETURN(PTR_ERR(obj));
480 pool->qpi_root = obj;
482 for (qtype = 0; qtype < MAXQUOTAS; qtype++) {
483 /* Generating FID of global index in charge of storing
484 * settings for this quota type */
485 lquota_generate_fid(&qti->qti_fid, pool_id, pool_type,
488 /* open/create the global index file for this quota
490 obj = lquota_disk_glb_find_create(env, dev,
492 &qti->qti_fid, false);
495 CERROR("%s: failed to create glb index copy for"
496 " %s type (%d)\n", qmt->qmt_svname,
497 QTYPE_NAME(qtype), rc);
501 pool->qpi_glb_obj[qtype] = obj;
503 /* create quota entry site for this quota type */
504 pool->qpi_site[qtype] = lquota_site_alloc(env, pool,
507 if (IS_ERR(pool->qpi_site[qtype])) {
508 rc = PTR_ERR(pool->qpi_site[qtype]);
509 CERROR("%s: failed to create site for %s type "
510 "(%d)\n", qmt->qmt_svname,
511 QTYPE_NAME(qtype), rc);
515 /* count number of slaves which already connected to
516 * the master in the past */
517 pool->qpi_slv_nr[qtype] = 0;
518 rc = lquota_disk_for_each_slv(env, pool->qpi_root,
521 &pool->qpi_slv_nr[qtype]);
523 CERROR("%s: failed to scan & count slave "
524 "indexes for %s type (%d)\n",
525 qmt->qmt_svname, QTYPE_NAME(qtype), rc);
529 /* add procfs file to dump the global index, mostly for
530 * debugging purpose */
531 sprintf(qti->qti_buf, "glb-%s", QTYPE_NAME(qtype));
532 rc = lprocfs_seq_create(pool->qpi_proc, qti->qti_buf,
533 0444, &lprocfs_quota_seq_fops,
536 CWARN("%s: Error adding procfs file for global"
537 "quota index "DFID", rc:%d\n",
538 qmt->qmt_svname, PFID(&qti->qti_fid), rc);
547 * Handle new slave connection. Called when a slave enqueues the global quota
548 * lock at the beginning of the reintegration procedure.
550 * \param env - is the environment passed by the caller
551 * \parap qmt - is the quota master target handling this request
552 * \param glb_fid - is the fid of the global index file
553 * \param slv_fid - is the fid of the newly created slave index file
554 * \param slv_ver - is the current version of the slave index file
555 * \param uuid - is the uuid of slave which is (re)connecting to the master
558 * \retval - 0 on success, appropriate error on failure
560 int qmt_pool_new_conn(const struct lu_env *env, struct qmt_device *qmt,
561 struct lu_fid *glb_fid, struct lu_fid *slv_fid,
562 __u64 *slv_ver, struct obd_uuid *uuid)
564 struct qmt_pool_info *pool;
565 struct dt_object *slv_obj;
566 int pool_id, pool_type, qtype;
567 bool created = false;
570 /* extract pool info from global index FID */
571 rc = lquota_extract_fid(glb_fid, &pool_id, &pool_type, &qtype);
575 /* look-up pool in charge of this global index FID */
576 pool = qmt_pool_lookup(env, qmt, pool_id, pool_type);
578 RETURN(PTR_ERR(pool));
580 /* look-up slave index file */
581 slv_obj = lquota_disk_slv_find(env, qmt->qmt_child, pool->qpi_root,
583 if (IS_ERR(slv_obj) && PTR_ERR(slv_obj) == -ENOENT) {
584 /* create slave index file */
585 slv_obj = lquota_disk_slv_find_create(env, qmt->qmt_child,
586 pool->qpi_root, glb_fid,
590 if (IS_ERR(slv_obj)) {
591 rc = PTR_ERR(slv_obj);
592 CERROR("%s: failed to create quota slave index file for %s (%d)"
593 "\n", qmt->qmt_svname, obd_uuid2str(uuid), rc);
597 /* retrieve slave fid & current object version */
598 memcpy(slv_fid, lu_object_fid(&slv_obj->do_lu), sizeof(*slv_fid));
599 *slv_ver = dt_version_get(env, slv_obj);
600 lu_object_put(env, &slv_obj->do_lu);
602 pool->qpi_slv_nr[qtype]++;
604 qpi_putref(env, pool);
609 * Look-up a lquota_entry in the pool hash and allocate it if not found.
611 * \param env - is the environment passed by the caller
612 * \param qmt - is the quota master target for which we have to initializa the
614 * \param pool_id - is the 16-bit identifier of the pool
615 * \param pool_type - is the pool type, either LQUOTA_RES_MD or LQUOTA_RES_DT.
616 * \param qtype - is the quota type, either user or group.
617 * \param qid - is the quota ID to look-up
619 * \retval - valid pointer to lquota entry on success, appropriate error on
622 struct lquota_entry *qmt_pool_lqe_lookup(const struct lu_env *env,
623 struct qmt_device *qmt,
624 int pool_id, int pool_type,
625 int qtype, union lquota_id *qid)
627 struct qmt_pool_info *pool;
628 struct lquota_entry *lqe;
631 /* look-up pool responsible for this global index FID */
632 pool = qmt_pool_lookup(env, qmt, pool_id, pool_type);
634 RETURN((void *)pool);
636 /* now that we have the pool, let's look-up the quota entry in the
637 * right quota site */
638 lqe = lqe_locate(env, pool->qpi_site[qtype], qid);
640 qpi_putref(env, pool);