Whamcloud - gitweb
47f5b120303374115bffe1f947c62587bd877e05
[fs/lustre-release.git] / lustre / quota / qmt_pool.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 021110-1307, USA
20  *
21  * GPL HEADER END
22  */
23 /*
24  * Copyright (c) 2012, 2013, Intel Corporation.
25  * Use is subject to license terms.
26  *
27  * Author: Johann Lombardi <johann.lombardi@intel.com>
28  * Author: Niu    Yawei    <yawei.niu@intel.com>
29  */
30
31 /*
32  * A Quota Master Target has a hash table where it stores qmt_pool_info
33  * structures. There is one such structure for each pool managed by the QMT.
34  *
35  * Each pool can have different quota types enforced (typically user & group
36  * quota). A pool is in charge of managing lquota_entry structures for each
37  * quota type. This is done by creating one lquota_entry site per quota
38  * type. A site stores entries in a hash table and read quota settings from disk
39  * when a given ID isn't present in the hash.
40  *
41  * The pool API exported here is the following:
42  * - qmt_pool_init(): initializes the general QMT structures used to manage
43  *                    pools.
44  * - qmt_pool_fini(): frees the structures allocated by qmt_pool_fini().
45  * - qmt_pool_prepare(): sets up the on-disk indexes associated with each pool.
46  * - qmt_pool_new_conn(): is used to create a new slave index file.
47  * - qmt_pool_lqe_lookup(): returns an up-to-date lquota entry associated with
48  *                          a given ID.
49  */
50
51 #define DEBUG_SUBSYSTEM S_LQUOTA
52
53 #include <obd_class.h>
54 #include <lprocfs_status.h>
55 #include "qmt_internal.h"
56
57 static void qmt_pool_free(const struct lu_env *, struct qmt_pool_info *);
58
59 /*
60  * Static helper functions not used outside the scope of this file
61  */
62
63 /*
64  * Reference counter management for qmt_pool_info structures
65  */
66 static inline void qpi_getref(struct qmt_pool_info *pool)
67 {
68         atomic_inc(&pool->qpi_ref);
69 }
70
71 static inline void qpi_putref(const struct lu_env *env,
72                               struct qmt_pool_info *pool)
73 {
74         LASSERT(atomic_read(&pool->qpi_ref) > 0);
75         if (atomic_dec_and_test(&pool->qpi_ref))
76                 qmt_pool_free(env, pool);
77 }
78
79 static inline void qpi_putref_locked(struct qmt_pool_info *pool)
80 {
81         LASSERT(atomic_read(&pool->qpi_ref) > 1);
82         atomic_dec(&pool->qpi_ref);
83 }
84
85 /*
86  * Hash functions for qmt_pool_info management
87  */
88
89 static unsigned
90 qpi_hash_hash(struct cfs_hash *hs, const void *key, unsigned mask)
91 {
92         return cfs_hash_u32_hash(*((__u32 *)key), mask);
93 }
94
95 static void *qpi_hash_key(struct hlist_node *hnode)
96 {
97         struct qmt_pool_info *pool;
98         pool = hlist_entry(hnode, struct qmt_pool_info, qpi_hash);
99         return &pool->qpi_key;
100 }
101
102 static int qpi_hash_keycmp(const void *key, struct hlist_node *hnode)
103 {
104         struct qmt_pool_info *pool;
105         pool = hlist_entry(hnode, struct qmt_pool_info, qpi_hash);
106         return pool->qpi_key == *((__u32 *)key);
107 }
108
109 static void *qpi_hash_object(struct hlist_node *hnode)
110 {
111         return hlist_entry(hnode, struct qmt_pool_info, qpi_hash);
112 }
113
114 static void qpi_hash_get(struct cfs_hash *hs, struct hlist_node *hnode)
115 {
116         struct qmt_pool_info *pool;
117         pool = hlist_entry(hnode, struct qmt_pool_info, qpi_hash);
118         qpi_getref(pool);
119 }
120
121 static void qpi_hash_put_locked(struct cfs_hash *hs, struct hlist_node *hnode)
122 {
123         struct qmt_pool_info *pool;
124         pool = hlist_entry(hnode, struct qmt_pool_info, qpi_hash);
125         qpi_putref_locked(pool);
126 }
127
128 static void qpi_hash_exit(struct cfs_hash *hs, struct hlist_node *hnode)
129 {
130         CERROR("Should not have any item left!\n");
131 }
132
133 /* vector of hash operations */
134 static struct cfs_hash_ops qpi_hash_ops = {
135         .hs_hash        = qpi_hash_hash,
136         .hs_key         = qpi_hash_key,
137         .hs_keycmp      = qpi_hash_keycmp,
138         .hs_object      = qpi_hash_object,
139         .hs_get         = qpi_hash_get,
140         .hs_put_locked  = qpi_hash_put_locked,
141         .hs_exit        = qpi_hash_exit
142 };
143
144 /* some procfs helpers */
145 static int qpi_state_seq_show(struct seq_file *m, void *data)
146 {
147         struct qmt_pool_info    *pool = m->private;
148         int                      type;
149
150         LASSERT(pool != NULL);
151
152         seq_printf(m, "pool:\n"
153                    "    id: %u\n"
154                    "    type: %s\n"
155                    "    ref: %d\n"
156                    "    least qunit: %lu\n",
157                    pool->qpi_key & 0x0000ffff,
158                    RES_NAME(pool->qpi_key >> 16),
159                    atomic_read(&pool->qpi_ref),
160                    pool->qpi_least_qunit);
161
162         for (type = 0; type < MAXQUOTAS; type++)
163                 seq_printf(m, "    %s:\n"
164                            "        #slv: %d\n"
165                            "        #lqe: %d\n",
166                            QTYPE_NAME(type),
167                            pool->qpi_slv_nr[type],
168                     atomic_read(&pool->qpi_site[type]->lqs_hash->hs_count));
169
170         return 0;
171 }
172 LPROC_SEQ_FOPS_RO(qpi_state);
173
174 static int qpi_soft_least_qunit_seq_show(struct seq_file *m, void *data)
175 {
176         struct qmt_pool_info    *pool = m->private;
177         LASSERT(pool != NULL);
178
179         return seq_printf(m, "%lu\n", pool->qpi_soft_least_qunit);
180 }
181
182 static ssize_t
183 qpi_soft_least_qunit_seq_write(struct file *file, const char __user *buffer,
184                                size_t count, loff_t *off)
185 {
186         struct qmt_pool_info    *pool;
187         int     qunit, rc;
188         s64     least_qunit;
189
190         pool = ((struct seq_file *)file->private_data)->private;
191         LASSERT(pool != NULL);
192
193         /* Not tuneable for inode limit */
194         if (pool->qpi_key >> 16 != LQUOTA_RES_DT)
195                 return -EINVAL;
196
197         rc = lprocfs_str_to_s64(buffer, count, &least_qunit);
198         if (rc)
199                 return rc;
200
201         /* Miminal qpi_soft_least_qunit */
202         qunit = pool->qpi_least_qunit << 2;
203         /* The value must be power of miminal qpi_soft_least_qunit, see
204          * how the qunit is adjusted in qmt_adjust_qunit(). */
205         while (qunit > 0 && qunit < least_qunit)
206                 qunit <<= 2;
207         if (qunit <= 0)
208                 qunit = INT_MAX & ~3;
209
210         pool->qpi_soft_least_qunit = qunit;
211         return count;
212 }
213 LPROC_SEQ_FOPS(qpi_soft_least_qunit);
214
215 static struct lprocfs_vars lprocfs_quota_qpi_vars[] = {
216         { .name =       "info",
217           .fops =       &qpi_state_fops },
218         { .name =       "soft_least_qunit",
219           .fops =       &qpi_soft_least_qunit_fops },
220         { NULL }
221 };
222
223 /*
224  * Allocate a new qmt_pool_info structure and add it to the pool hash table
225  * of the qmt.
226  *
227  * \param env       - is the environment passed by the caller
228  * \param qmt       - is the quota master target
229  * \param pool_id   - is the 16-bit pool identifier of the new pool to add
230  * \param pool_type - is the resource type of this pool instance, either
231  *                    LQUOTA_RES_MD or LQUOTA_RES_DT.
232  *
233  * \retval - 0 on success, appropriate error on failure
234  */
235 static int qmt_pool_alloc(const struct lu_env *env, struct qmt_device *qmt,
236                           int pool_id, int pool_type)
237 {
238         struct qmt_thread_info  *qti = qmt_info(env);
239         struct qmt_pool_info    *pool;
240         int                      rc = 0;
241         ENTRY;
242
243         OBD_ALLOC_PTR(pool);
244         if (pool == NULL)
245                 RETURN(-ENOMEM);
246         INIT_LIST_HEAD(&pool->qpi_linkage);
247
248         /* assign key used by hash functions */
249         pool->qpi_key = pool_id + (pool_type << 16);
250
251         /* initialize refcount to 1, hash table will then grab an additional
252          * reference */
253         atomic_set(&pool->qpi_ref, 1);
254
255         /* set up least qunit size to use for this pool */
256         pool->qpi_least_qunit = LQUOTA_LEAST_QUNIT(pool_type);
257         if (pool_type == LQUOTA_RES_DT)
258                 pool->qpi_soft_least_qunit = pool->qpi_least_qunit << 2;
259         else
260                 pool->qpi_soft_least_qunit = pool->qpi_least_qunit;
261
262         /* create pool proc directory */
263         sprintf(qti->qti_buf, "%s-0x%x", RES_NAME(pool_type), pool_id);
264         pool->qpi_proc = lprocfs_register(qti->qti_buf, qmt->qmt_proc,
265                                           lprocfs_quota_qpi_vars, pool);
266         if (IS_ERR(pool->qpi_proc)) {
267                 rc = PTR_ERR(pool->qpi_proc);
268                 CERROR("%s: failed to create proc entry for pool %s (%d)\n",
269                        qmt->qmt_svname, qti->qti_buf, rc);
270                 pool->qpi_proc = NULL;
271                 GOTO(out, rc);
272         }
273
274         /* grab reference on master target that this pool belongs to */
275         lu_device_get(qmt2lu_dev(qmt));
276         lu_ref_add(&qmt2lu_dev(qmt)->ld_reference, "pool", pool);
277         pool->qpi_qmt = qmt;
278
279         /* add to qmt hash table */
280         rc = cfs_hash_add_unique(qmt->qmt_pool_hash, &pool->qpi_key,
281                                  &pool->qpi_hash);
282         if (rc) {
283                 CERROR("%s: failed to add pool %s to qmt hash (%d)\n",
284                        qmt->qmt_svname, qti->qti_buf, rc);
285                 GOTO(out, rc);
286         }
287
288         /* add to qmt pool list */
289         list_add_tail(&pool->qpi_linkage, &qmt->qmt_pool_list);
290         EXIT;
291 out:
292         if (rc)
293                 /* this frees the pool structure since refcount is equal to 1 */
294                 qpi_putref(env, pool);
295         return rc;
296 }
297
298 /*
299  * Delete a qmt_pool_info instance and all structures associated.
300  *
301  * \param env  - is the environment passed by the caller
302  * \param pool - is the qmt_pool_info structure to free
303  */
304 static void qmt_pool_free(const struct lu_env *env, struct qmt_pool_info *pool)
305 {
306         int     qtype;
307         ENTRY;
308
309         /* release proc entry */
310         if (pool->qpi_proc) {
311                 lprocfs_remove(&pool->qpi_proc);
312                 pool->qpi_proc = NULL;
313         }
314
315         /* release per-quota type site used to manage quota entries as well as
316          * references to global index files */
317         for (qtype = 0; qtype < MAXQUOTAS; qtype++) {
318                 /* release lqe storing grace time */
319                 if (pool->qpi_grace_lqe[qtype] != NULL)
320                         lqe_putref(pool->qpi_grace_lqe[qtype]);
321
322                 /* release site */
323                 if (pool->qpi_site[qtype] != NULL &&
324                     !IS_ERR(pool->qpi_site[qtype]))
325                         lquota_site_free(env, pool->qpi_site[qtype]);
326                 /* release reference to global index */
327                 if (pool->qpi_glb_obj[qtype] != NULL &&
328                     !IS_ERR(pool->qpi_glb_obj[qtype]))
329                         lu_object_put(env, &pool->qpi_glb_obj[qtype]->do_lu);
330         }
331
332         /* release reference on pool directory */
333         if (pool->qpi_root != NULL && !IS_ERR(pool->qpi_root))
334                 lu_object_put(env, &pool->qpi_root->do_lu);
335
336         /* release reference on the master target */
337         if (pool->qpi_qmt != NULL) {
338                 struct lu_device *ld = qmt2lu_dev(pool->qpi_qmt);
339
340                 lu_device_put(ld);
341                 lu_ref_del(&ld->ld_reference, "pool", pool);
342                 pool->qpi_qmt = NULL;
343         }
344
345         LASSERT(list_empty(&pool->qpi_linkage));
346         OBD_FREE_PTR(pool);
347 }
348
349 /*
350  * Look-up a pool in the hash table based on the pool ID and type.
351  *
352  * \param env     - is the environment passed by the caller
353  * \param qmt     - is the quota master target
354  * \param pool_id   - is the 16-bit identifier of the pool to look up
355  * \param pool_type - is the type of this pool, either LQUOTA_RES_MD or
356  *                    LQUOTA_RES_DT.
357  */
358 static struct qmt_pool_info *qmt_pool_lookup(const struct lu_env *env,
359                                              struct qmt_device *qmt,
360                                              int pool_id, int pool_type)
361 {
362         struct qmt_pool_info    *pool;
363         __u32                    key;
364         ENTRY;
365
366         LASSERT(qmt->qmt_pool_hash != NULL);
367
368         /* look-up pool in hash table */
369         key = pool_id + (pool_type << 16);
370         pool = cfs_hash_lookup(qmt->qmt_pool_hash, (void *)&key);
371         if (pool == NULL) {
372                 /* this qmt isn't managing this pool! */
373                 CERROR("%s: looking up quota entry for a pool (0x%x/%d) which "
374                        "isn't managed by this quota master target\n",
375                        qmt->qmt_svname, pool_id, pool_type);
376                 RETURN(ERR_PTR(-ENOENT));
377         }
378         RETURN(pool);
379 }
380
381 /*
382  * Functions implementing the pool API, used by the qmt handlers
383  */
384
385 /*
386  * Destroy all pools which are still in the hash table and free the pool
387  * hash table.
388  *
389  * \param env - is the environment passed by the caller
390  * \param qmt - is the quota master target
391  *
392  */
393 void qmt_pool_fini(const struct lu_env *env, struct qmt_device *qmt)
394 {
395         struct qmt_pool_info    *pool;
396         struct list_head        *pos, *n;
397         ENTRY;
398
399         if (qmt->qmt_pool_hash == NULL)
400                 RETURN_EXIT;
401
402         /* parse list of pool and destroy each element */
403         list_for_each_safe(pos, n, &qmt->qmt_pool_list) {
404                 pool = list_entry(pos, struct qmt_pool_info,
405                                   qpi_linkage);
406                 /* remove from hash */
407                 cfs_hash_del(qmt->qmt_pool_hash, &pool->qpi_key,
408                              &pool->qpi_hash);
409
410                 /* remove from list */
411                 list_del_init(&pool->qpi_linkage);
412
413                 /* release extra reference taken in qmt_pool_alloc */
414                 qpi_putref(env, pool);
415         }
416         LASSERT(list_empty(&qmt->qmt_pool_list));
417
418         cfs_hash_putref(qmt->qmt_pool_hash);
419         qmt->qmt_pool_hash = NULL;
420         EXIT;
421 }
422
423 /*
424  * Initialize pool configure for the quota master target. For now, we only
425  * support the default data (i.e. all OSTs) and metadata (i.e. all the MDTs)
426  * pool which are instantiated in this function.
427  *
428  * \param env - is the environment passed by the caller
429  * \param qmt - is the quota master target for which we have to initialize the
430  *              pool configuration
431  *
432  * \retval - 0 on success, appropriate error on failure
433  */
434 int qmt_pool_init(const struct lu_env *env, struct qmt_device *qmt)
435 {
436         int     res, rc = 0;
437         ENTRY;
438
439         /* initialize pool hash table */
440         qmt->qmt_pool_hash = cfs_hash_create("POOL_HASH",
441                                              HASH_POOLS_CUR_BITS,
442                                              HASH_POOLS_MAX_BITS,
443                                              HASH_POOLS_BKT_BITS, 0,
444                                              CFS_HASH_MIN_THETA,
445                                              CFS_HASH_MAX_THETA,
446                                              &qpi_hash_ops,
447                                              CFS_HASH_DEFAULT);
448         if (qmt->qmt_pool_hash == NULL) {
449                 CERROR("%s: failed to create pool hash table\n",
450                        qmt->qmt_svname);
451                 RETURN(-ENOMEM);
452         }
453
454         /* initialize pool list */
455         INIT_LIST_HEAD(&qmt->qmt_pool_list);
456
457         /* Instantiate pool master for the default data and metadata pool (both
458          * have pool ID equals to 0).
459          * This code will have to be revisited once we support quota on
460          * non-default pools */
461         for (res = LQUOTA_FIRST_RES; res < LQUOTA_LAST_RES; res++) {
462                 rc = qmt_pool_alloc(env, qmt, 0, res);
463                 if (rc)
464                         break;
465         }
466
467         if (rc)
468                 qmt_pool_fini(env, qmt);
469
470         RETURN(rc);
471 }
472
473 static int qmt_slv_cnt(const struct lu_env *env, struct lu_fid *glb_fid,
474                        char *slv_name, struct lu_fid *slv_fid, void *arg)
475 {
476         int *nr = arg;
477
478         /* one more slave */
479         (*nr)++;
480
481         return 0;
482 }
483
484 /*
485  * Set up on-disk index files associated with each pool.
486  *
487  * \param env - is the environment passed by the caller
488  * \param qmt - is the quota master target for which we have to initialize the
489  *              pool configuration
490  * \param qmt_root - is the on-disk directory created for the QMT.
491  *
492  * \retval - 0 on success, appropriate error on failure
493  */
494 int qmt_pool_prepare(const struct lu_env *env, struct qmt_device *qmt,
495                      struct dt_object *qmt_root)
496 {
497         struct qmt_thread_info  *qti = qmt_info(env);
498         struct lquota_glb_rec   *rec = &qti->qti_glb_rec;
499         struct qmt_pool_info    *pool;
500         struct dt_device        *dev = NULL;
501         dt_obj_version_t         version;
502         struct list_head        *pos;
503         int                      rc = 0, qtype;
504         ENTRY;
505
506         LASSERT(qmt->qmt_pool_hash != NULL);
507
508         /* iterate over each pool in the hash and allocate a quota site for each
509          * one. This involves creating a global index file on disk */
510         list_for_each(pos, &qmt->qmt_pool_list) {
511                 struct dt_object        *obj;
512                 int                      pool_type, pool_id;
513                 struct lquota_entry     *lqe;
514
515                 pool = list_entry(pos, struct qmt_pool_info,
516                                   qpi_linkage);
517
518                 pool_id   = pool->qpi_key & 0x0000ffff;
519                 pool_type = pool->qpi_key >> 16;
520                 if (dev == NULL)
521                         dev = pool->qpi_qmt->qmt_child;
522
523                 /* allocate directory for this pool */
524                 sprintf(qti->qti_buf, "%s-0x%x", RES_NAME(pool_type), pool_id);
525                 obj = lquota_disk_dir_find_create(env, qmt->qmt_child, qmt_root,
526                                                   qti->qti_buf);
527                 if (IS_ERR(obj))
528                         RETURN(PTR_ERR(obj));
529                 pool->qpi_root = obj;
530
531                 for (qtype = 0; qtype < MAXQUOTAS; qtype++) {
532                         /* Generating FID of global index in charge of storing
533                          * settings for this quota type */
534                         lquota_generate_fid(&qti->qti_fid, pool_id, pool_type,
535                                             qtype);
536
537                         /* open/create the global index file for this quota
538                          * type */
539                         obj = lquota_disk_glb_find_create(env, dev,
540                                                           pool->qpi_root,
541                                                           &qti->qti_fid, false);
542                         if (IS_ERR(obj)) {
543                                 rc = PTR_ERR(obj);
544                                 CERROR("%s: failed to create glb index copy for"
545                                        " %s type (%d)\n", qmt->qmt_svname,
546                                        QTYPE_NAME(qtype), rc);
547                                 RETURN(rc);
548                         }
549
550                         pool->qpi_glb_obj[qtype] = obj;
551
552                         version = dt_version_get(env, obj);
553                         /* set default grace time for newly created index */
554                         if (version == 0) {
555                                 rec->qbr_hardlimit = 0;
556                                 rec->qbr_softlimit = 0;
557                                 rec->qbr_granted = 0;
558                                 rec->qbr_time = pool_type == LQUOTA_RES_MD ?
559                                         MAX_IQ_TIME : MAX_DQ_TIME;
560
561                                 rc = lquota_disk_write_glb(env, obj, 0, rec);
562                                 if (rc) {
563                                         CERROR("%s: failed to set default "
564                                                "grace time for %s type (%d)\n",
565                                                qmt->qmt_svname,
566                                                QTYPE_NAME(qtype), rc);
567                                         RETURN(rc);
568                                 }
569
570                                 rc = lquota_disk_update_ver(env, dev, obj, 1);
571                                 if (rc) {
572                                         CERROR("%s: failed to set initial "
573                                                "version for %s type (%d)\n",
574                                                qmt->qmt_svname,
575                                                QTYPE_NAME(qtype), rc);
576                                         RETURN(rc);
577                                 }
578                         }
579
580                         /* create quota entry site for this quota type */
581                         pool->qpi_site[qtype] = lquota_site_alloc(env, pool,
582                                                                   true, qtype,
583                                                                   &qmt_lqe_ops);
584                         if (IS_ERR(pool->qpi_site[qtype])) {
585                                 rc = PTR_ERR(pool->qpi_site[qtype]);
586                                 CERROR("%s: failed to create site for %s type "
587                                        "(%d)\n", qmt->qmt_svname,
588                                        QTYPE_NAME(qtype), rc);
589                                 RETURN(rc);
590                         }
591
592                         /* count number of slaves which already connected to
593                          * the master in the past */
594                         pool->qpi_slv_nr[qtype] = 0;
595                         rc = lquota_disk_for_each_slv(env, pool->qpi_root,
596                                                       &qti->qti_fid,
597                                                       qmt_slv_cnt,
598                                                       &pool->qpi_slv_nr[qtype]);
599                         if (rc) {
600                                 CERROR("%s: failed to scan & count slave "
601                                        "indexes for %s type (%d)\n",
602                                        qmt->qmt_svname, QTYPE_NAME(qtype), rc);
603                                 RETURN(rc);
604                         }
605
606                         /* Global grace time is stored in quota settings of
607                          * ID 0. */
608                         qti->qti_id.qid_uid = 0;
609
610                         /* look-up quota entry storing grace time */
611                         lqe = lqe_locate(env, pool->qpi_site[qtype],
612                                          &qti->qti_id);
613                         if (IS_ERR(lqe))
614                                 RETURN(PTR_ERR(lqe));
615                         pool->qpi_grace_lqe[qtype] = lqe;
616 #ifdef CONFIG_PROC_FS
617                         /* add procfs file to dump the global index, mostly for
618                          * debugging purpose */
619                         sprintf(qti->qti_buf, "glb-%s", QTYPE_NAME(qtype));
620                         rc = lprocfs_seq_create(pool->qpi_proc, qti->qti_buf,
621                                                 0444, &lprocfs_quota_seq_fops,
622                                                 obj);
623                         if (rc)
624                                 CWARN("%s: Error adding procfs file for global"
625                                       "quota index "DFID", rc:%d\n",
626                                       qmt->qmt_svname, PFID(&qti->qti_fid), rc);
627 #endif
628                 }
629         }
630
631         RETURN(0);
632 }
633
634 /*
635  * Handle new slave connection. Called when a slave enqueues the global quota
636  * lock at the beginning of the reintegration procedure.
637  *
638  * \param env - is the environment passed by the caller
639  * \parap qmt - is the quota master target handling this request
640  * \param glb_fid - is the fid of the global index file
641  * \param slv_fid - is the fid of the newly created slave index file
642  * \param slv_ver - is the current version of the slave index file
643  * \param uuid    - is the uuid of slave which is (re)connecting to the master
644  *                  target
645  *
646  * \retval - 0 on success, appropriate error on failure
647  */
648 int qmt_pool_new_conn(const struct lu_env *env, struct qmt_device *qmt,
649                       struct lu_fid *glb_fid, struct lu_fid *slv_fid,
650                       __u64 *slv_ver, struct obd_uuid *uuid)
651 {
652         struct qmt_pool_info    *pool;
653         struct dt_object        *slv_obj;
654         int                      pool_id, pool_type, qtype;
655         bool                     created = false;
656         int                      rc = 0;
657
658         /* extract pool info from global index FID */
659         rc = lquota_extract_fid(glb_fid, &pool_id, &pool_type, &qtype);
660         if (rc)
661                 RETURN(rc);
662
663         /* look-up pool in charge of this global index FID */
664         pool = qmt_pool_lookup(env, qmt, pool_id, pool_type);
665         if (IS_ERR(pool))
666                 RETURN(PTR_ERR(pool));
667
668         /* look-up slave index file */
669         slv_obj = lquota_disk_slv_find(env, qmt->qmt_child, pool->qpi_root,
670                                        glb_fid, uuid);
671         if (IS_ERR(slv_obj) && PTR_ERR(slv_obj) == -ENOENT) {
672                 /* create slave index file */
673                 slv_obj = lquota_disk_slv_find_create(env, qmt->qmt_child,
674                                                       pool->qpi_root, glb_fid,
675                                                       uuid, false);
676                 created = true;
677         }
678         if (IS_ERR(slv_obj)) {
679                 rc = PTR_ERR(slv_obj);
680                 CERROR("%s: failed to create quota slave index file for %s (%d)"
681                        "\n", qmt->qmt_svname, obd_uuid2str(uuid), rc);
682                 GOTO(out, rc);
683         }
684
685         /* retrieve slave fid & current object version */
686         memcpy(slv_fid, lu_object_fid(&slv_obj->do_lu), sizeof(*slv_fid));
687         *slv_ver = dt_version_get(env, slv_obj);
688         lu_object_put(env, &slv_obj->do_lu);
689         if (created)
690                 pool->qpi_slv_nr[qtype]++;
691 out:
692         qpi_putref(env, pool);
693         RETURN(rc);
694 }
695
696 /*
697  * Look-up a lquota_entry in the pool hash and allocate it if not found.
698  *
699  * \param env - is the environment passed by the caller
700  * \param qmt - is the quota master target for which we have to initialize the
701  *              pool configuration
702  * \param pool_id   - is the 16-bit identifier of the pool
703  * \param pool_type - is the pool type, either LQUOTA_RES_MD or LQUOTA_RES_DT.
704  * \param qtype     - is the quota type, either user or group.
705  * \param qid       - is the quota ID to look-up
706  *
707  * \retval - valid pointer to lquota entry on success, appropriate error on
708  *           failure
709  */
710 struct lquota_entry *qmt_pool_lqe_lookup(const struct lu_env *env,
711                                          struct qmt_device *qmt,
712                                          int pool_id, int pool_type,
713                                          int qtype, union lquota_id *qid)
714 {
715         struct qmt_pool_info    *pool;
716         struct lquota_entry     *lqe;
717         ENTRY;
718
719         /* look-up pool responsible for this global index FID */
720         pool = qmt_pool_lookup(env, qmt, pool_id, pool_type);
721         if (IS_ERR(pool))
722                 RETURN((void *)pool);
723
724         if (qid->qid_uid == 0) {
725                 /* caller wants to access grace time, no need to look up the
726                  * entry since we keep a reference on ID 0 all the time */
727                 lqe = pool->qpi_grace_lqe[qtype];
728                 lqe_getref(lqe);
729                 GOTO(out, lqe);
730         }
731
732         /* now that we have the pool, let's look-up the quota entry in the
733          * right quota site */
734         lqe = lqe_locate(env, pool->qpi_site[qtype], qid);
735 out:
736         qpi_putref(env, pool);
737         RETURN(lqe);
738 }