Whamcloud - gitweb
LU-8851 nodemap: add uid/gid only flags to control mapping
[fs/lustre-release.git] / lustre / quota / qmt_pool.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 021110-1307, USA
20  *
21  * GPL HEADER END
22  */
23 /*
24  * Copyright (c) 2012, 2016, Intel Corporation.
25  * Use is subject to license terms.
26  *
27  * Author: Johann Lombardi <johann.lombardi@intel.com>
28  * Author: Niu    Yawei    <yawei.niu@intel.com>
29  */
30
31 /*
32  * A Quota Master Target has a hash table where it stores qmt_pool_info
33  * structures. There is one such structure for each pool managed by the QMT.
34  *
35  * Each pool can have different quota types enforced (typically user & group
36  * quota). A pool is in charge of managing lquota_entry structures for each
37  * quota type. This is done by creating one lquota_entry site per quota
38  * type. A site stores entries in a hash table and read quota settings from disk
39  * when a given ID isn't present in the hash.
40  *
41  * The pool API exported here is the following:
42  * - qmt_pool_init(): initializes the general QMT structures used to manage
43  *                    pools.
44  * - qmt_pool_fini(): frees the structures allocated by qmt_pool_fini().
45  * - qmt_pool_prepare(): sets up the on-disk indexes associated with each pool.
46  * - qmt_pool_new_conn(): is used to create a new slave index file.
47  * - qmt_pool_lqe_lookup(): returns an up-to-date lquota entry associated with
48  *                          a given ID.
49  */
50
51 #define DEBUG_SUBSYSTEM S_LQUOTA
52
53 #include <obd_class.h>
54 #include <lprocfs_status.h>
55 #include "qmt_internal.h"
56
57 static void qmt_pool_free(const struct lu_env *, struct qmt_pool_info *);
58
59 /*
60  * Static helper functions not used outside the scope of this file
61  */
62
63 /*
64  * Reference counter management for qmt_pool_info structures
65  */
66 static inline void qpi_getref(struct qmt_pool_info *pool)
67 {
68         atomic_inc(&pool->qpi_ref);
69 }
70
71 static inline void qpi_putref(const struct lu_env *env,
72                               struct qmt_pool_info *pool)
73 {
74         LASSERT(atomic_read(&pool->qpi_ref) > 0);
75         if (atomic_dec_and_test(&pool->qpi_ref))
76                 qmt_pool_free(env, pool);
77 }
78
79 static inline void qpi_putref_locked(struct qmt_pool_info *pool)
80 {
81         LASSERT(atomic_read(&pool->qpi_ref) > 1);
82         atomic_dec(&pool->qpi_ref);
83 }
84
85 /*
86  * Hash functions for qmt_pool_info management
87  */
88
89 static unsigned
90 qpi_hash_hash(struct cfs_hash *hs, const void *key, unsigned mask)
91 {
92         return cfs_hash_u32_hash(*((__u32 *)key), mask);
93 }
94
95 static void *qpi_hash_key(struct hlist_node *hnode)
96 {
97         struct qmt_pool_info *pool;
98         pool = hlist_entry(hnode, struct qmt_pool_info, qpi_hash);
99         return &pool->qpi_key;
100 }
101
102 static int qpi_hash_keycmp(const void *key, struct hlist_node *hnode)
103 {
104         struct qmt_pool_info *pool;
105         pool = hlist_entry(hnode, struct qmt_pool_info, qpi_hash);
106         return pool->qpi_key == *((__u32 *)key);
107 }
108
109 static void *qpi_hash_object(struct hlist_node *hnode)
110 {
111         return hlist_entry(hnode, struct qmt_pool_info, qpi_hash);
112 }
113
114 static void qpi_hash_get(struct cfs_hash *hs, struct hlist_node *hnode)
115 {
116         struct qmt_pool_info *pool;
117         pool = hlist_entry(hnode, struct qmt_pool_info, qpi_hash);
118         qpi_getref(pool);
119 }
120
121 static void qpi_hash_put_locked(struct cfs_hash *hs, struct hlist_node *hnode)
122 {
123         struct qmt_pool_info *pool;
124         pool = hlist_entry(hnode, struct qmt_pool_info, qpi_hash);
125         qpi_putref_locked(pool);
126 }
127
128 static void qpi_hash_exit(struct cfs_hash *hs, struct hlist_node *hnode)
129 {
130         CERROR("Should not have any item left!\n");
131 }
132
133 /* vector of hash operations */
134 static struct cfs_hash_ops qpi_hash_ops = {
135         .hs_hash        = qpi_hash_hash,
136         .hs_key         = qpi_hash_key,
137         .hs_keycmp      = qpi_hash_keycmp,
138         .hs_object      = qpi_hash_object,
139         .hs_get         = qpi_hash_get,
140         .hs_put_locked  = qpi_hash_put_locked,
141         .hs_exit        = qpi_hash_exit
142 };
143
144 /* some procfs helpers */
145 static int qpi_state_seq_show(struct seq_file *m, void *data)
146 {
147         struct qmt_pool_info    *pool = m->private;
148         int                      type;
149
150         LASSERT(pool != NULL);
151
152         seq_printf(m, "pool:\n"
153                    "    id: %u\n"
154                    "    type: %s\n"
155                    "    ref: %d\n"
156                    "    least qunit: %lu\n",
157                    pool->qpi_key & 0x0000ffff,
158                    RES_NAME(pool->qpi_key >> 16),
159                    atomic_read(&pool->qpi_ref),
160                    pool->qpi_least_qunit);
161
162         for (type = 0; type < LL_MAXQUOTAS; type++)
163                 seq_printf(m, "    %s:\n"
164                            "        #slv: %d\n"
165                            "        #lqe: %d\n",
166                            qtype_name(type),
167                            pool->qpi_slv_nr[type],
168                     atomic_read(&pool->qpi_site[type]->lqs_hash->hs_count));
169
170         return 0;
171 }
172 LPROC_SEQ_FOPS_RO(qpi_state);
173
174 static int qpi_soft_least_qunit_seq_show(struct seq_file *m, void *data)
175 {
176         struct qmt_pool_info    *pool = m->private;
177         LASSERT(pool != NULL);
178
179         seq_printf(m, "%lu\n", pool->qpi_soft_least_qunit);
180         return 0;
181 }
182
183 static ssize_t
184 qpi_soft_least_qunit_seq_write(struct file *file, const char __user *buffer,
185                                size_t count, loff_t *off)
186 {
187         struct qmt_pool_info    *pool;
188         int     qunit, rc;
189         s64     least_qunit;
190
191         pool = ((struct seq_file *)file->private_data)->private;
192         LASSERT(pool != NULL);
193
194         /* Not tuneable for inode limit */
195         if (pool->qpi_key >> 16 != LQUOTA_RES_DT)
196                 return -EINVAL;
197
198         rc = lprocfs_str_to_s64(buffer, count, &least_qunit);
199         if (rc)
200                 return rc;
201
202         /* Miminal qpi_soft_least_qunit */
203         qunit = pool->qpi_least_qunit << 2;
204         /* The value must be power of miminal qpi_soft_least_qunit, see
205          * how the qunit is adjusted in qmt_adjust_qunit(). */
206         while (qunit > 0 && qunit < least_qunit)
207                 qunit <<= 2;
208         if (qunit <= 0)
209                 qunit = INT_MAX & ~3;
210
211         pool->qpi_soft_least_qunit = qunit;
212         return count;
213 }
214 LPROC_SEQ_FOPS(qpi_soft_least_qunit);
215
216 static struct lprocfs_vars lprocfs_quota_qpi_vars[] = {
217         { .name =       "info",
218           .fops =       &qpi_state_fops },
219         { .name =       "soft_least_qunit",
220           .fops =       &qpi_soft_least_qunit_fops },
221         { NULL }
222 };
223
224 /*
225  * Allocate a new qmt_pool_info structure and add it to the pool hash table
226  * of the qmt.
227  *
228  * \param env       - is the environment passed by the caller
229  * \param qmt       - is the quota master target
230  * \param pool_id   - is the 16-bit pool identifier of the new pool to add
231  * \param pool_type - is the resource type of this pool instance, either
232  *                    LQUOTA_RES_MD or LQUOTA_RES_DT.
233  *
234  * \retval - 0 on success, appropriate error on failure
235  */
236 static int qmt_pool_alloc(const struct lu_env *env, struct qmt_device *qmt,
237                           int pool_id, int pool_type)
238 {
239         struct qmt_thread_info  *qti = qmt_info(env);
240         struct qmt_pool_info    *pool;
241         int                      rc = 0;
242         ENTRY;
243
244         OBD_ALLOC_PTR(pool);
245         if (pool == NULL)
246                 RETURN(-ENOMEM);
247         INIT_LIST_HEAD(&pool->qpi_linkage);
248
249         /* assign key used by hash functions */
250         pool->qpi_key = pool_id + (pool_type << 16);
251
252         /* initialize refcount to 1, hash table will then grab an additional
253          * reference */
254         atomic_set(&pool->qpi_ref, 1);
255
256         /* set up least qunit size to use for this pool */
257         pool->qpi_least_qunit = LQUOTA_LEAST_QUNIT(pool_type);
258         if (pool_type == LQUOTA_RES_DT)
259                 pool->qpi_soft_least_qunit = pool->qpi_least_qunit << 2;
260         else
261                 pool->qpi_soft_least_qunit = pool->qpi_least_qunit;
262
263         /* create pool proc directory */
264         sprintf(qti->qti_buf, "%s-0x%x", RES_NAME(pool_type), pool_id);
265         pool->qpi_proc = lprocfs_register(qti->qti_buf, qmt->qmt_proc,
266                                           lprocfs_quota_qpi_vars, pool);
267         if (IS_ERR(pool->qpi_proc)) {
268                 rc = PTR_ERR(pool->qpi_proc);
269                 CERROR("%s: failed to create proc entry for pool %s (%d)\n",
270                        qmt->qmt_svname, qti->qti_buf, rc);
271                 pool->qpi_proc = NULL;
272                 GOTO(out, rc);
273         }
274
275         /* grab reference on master target that this pool belongs to */
276         lu_device_get(qmt2lu_dev(qmt));
277         lu_ref_add(&qmt2lu_dev(qmt)->ld_reference, "pool", pool);
278         pool->qpi_qmt = qmt;
279
280         /* add to qmt hash table */
281         rc = cfs_hash_add_unique(qmt->qmt_pool_hash, &pool->qpi_key,
282                                  &pool->qpi_hash);
283         if (rc) {
284                 CERROR("%s: failed to add pool %s to qmt hash (%d)\n",
285                        qmt->qmt_svname, qti->qti_buf, rc);
286                 GOTO(out, rc);
287         }
288
289         /* add to qmt pool list */
290         list_add_tail(&pool->qpi_linkage, &qmt->qmt_pool_list);
291         EXIT;
292 out:
293         if (rc)
294                 /* this frees the pool structure since refcount is equal to 1 */
295                 qpi_putref(env, pool);
296         return rc;
297 }
298
299 /*
300  * Delete a qmt_pool_info instance and all structures associated.
301  *
302  * \param env  - is the environment passed by the caller
303  * \param pool - is the qmt_pool_info structure to free
304  */
305 static void qmt_pool_free(const struct lu_env *env, struct qmt_pool_info *pool)
306 {
307         int     qtype;
308         ENTRY;
309
310         /* release proc entry */
311         if (pool->qpi_proc) {
312                 lprocfs_remove(&pool->qpi_proc);
313                 pool->qpi_proc = NULL;
314         }
315
316         /* release per-quota type site used to manage quota entries as well as
317          * references to global index files */
318         for (qtype = 0; qtype < LL_MAXQUOTAS; qtype++) {
319                 /* release lqe storing grace time */
320                 if (pool->qpi_grace_lqe[qtype] != NULL)
321                         lqe_putref(pool->qpi_grace_lqe[qtype]);
322
323                 /* release site */
324                 if (pool->qpi_site[qtype] != NULL &&
325                     !IS_ERR(pool->qpi_site[qtype]))
326                         lquota_site_free(env, pool->qpi_site[qtype]);
327                 /* release reference to global index */
328                 if (pool->qpi_glb_obj[qtype] != NULL &&
329                     !IS_ERR(pool->qpi_glb_obj[qtype]))
330                         dt_object_put(env, pool->qpi_glb_obj[qtype]);
331         }
332
333         /* release reference on pool directory */
334         if (pool->qpi_root != NULL && !IS_ERR(pool->qpi_root))
335                 dt_object_put(env, pool->qpi_root);
336
337         /* release reference on the master target */
338         if (pool->qpi_qmt != NULL) {
339                 struct lu_device *ld = qmt2lu_dev(pool->qpi_qmt);
340
341                 lu_device_put(ld);
342                 lu_ref_del(&ld->ld_reference, "pool", pool);
343                 pool->qpi_qmt = NULL;
344         }
345
346         LASSERT(list_empty(&pool->qpi_linkage));
347         OBD_FREE_PTR(pool);
348 }
349
350 /*
351  * Look-up a pool in the hash table based on the pool ID and type.
352  *
353  * \param env     - is the environment passed by the caller
354  * \param qmt     - is the quota master target
355  * \param pool_id   - is the 16-bit identifier of the pool to look up
356  * \param pool_type - is the type of this pool, either LQUOTA_RES_MD or
357  *                    LQUOTA_RES_DT.
358  */
359 static struct qmt_pool_info *qmt_pool_lookup(const struct lu_env *env,
360                                              struct qmt_device *qmt,
361                                              int pool_id, int pool_type)
362 {
363         struct qmt_pool_info    *pool;
364         __u32                    key;
365         ENTRY;
366
367         LASSERT(qmt->qmt_pool_hash != NULL);
368
369         /* look-up pool in hash table */
370         key = pool_id + (pool_type << 16);
371         pool = cfs_hash_lookup(qmt->qmt_pool_hash, (void *)&key);
372         if (pool == NULL) {
373                 /* this qmt isn't managing this pool! */
374                 CERROR("%s: looking up quota entry for a pool (0x%x/%d) which "
375                        "isn't managed by this quota master target\n",
376                        qmt->qmt_svname, pool_id, pool_type);
377                 RETURN(ERR_PTR(-ENOENT));
378         }
379         RETURN(pool);
380 }
381
382 /*
383  * Functions implementing the pool API, used by the qmt handlers
384  */
385
386 /*
387  * Destroy all pools which are still in the hash table and free the pool
388  * hash table.
389  *
390  * \param env - is the environment passed by the caller
391  * \param qmt - is the quota master target
392  *
393  */
394 void qmt_pool_fini(const struct lu_env *env, struct qmt_device *qmt)
395 {
396         struct qmt_pool_info    *pool;
397         struct list_head        *pos, *n;
398         ENTRY;
399
400         if (qmt->qmt_pool_hash == NULL)
401                 RETURN_EXIT;
402
403         /* parse list of pool and destroy each element */
404         list_for_each_safe(pos, n, &qmt->qmt_pool_list) {
405                 pool = list_entry(pos, struct qmt_pool_info,
406                                   qpi_linkage);
407                 /* remove from hash */
408                 cfs_hash_del(qmt->qmt_pool_hash, &pool->qpi_key,
409                              &pool->qpi_hash);
410
411                 /* remove from list */
412                 list_del_init(&pool->qpi_linkage);
413
414                 /* release extra reference taken in qmt_pool_alloc */
415                 qpi_putref(env, pool);
416         }
417         LASSERT(list_empty(&qmt->qmt_pool_list));
418
419         cfs_hash_putref(qmt->qmt_pool_hash);
420         qmt->qmt_pool_hash = NULL;
421         EXIT;
422 }
423
424 /*
425  * Initialize pool configure for the quota master target. For now, we only
426  * support the default data (i.e. all OSTs) and metadata (i.e. all the MDTs)
427  * pool which are instantiated in this function.
428  *
429  * \param env - is the environment passed by the caller
430  * \param qmt - is the quota master target for which we have to initialize the
431  *              pool configuration
432  *
433  * \retval - 0 on success, appropriate error on failure
434  */
435 int qmt_pool_init(const struct lu_env *env, struct qmt_device *qmt)
436 {
437         int     res, rc = 0;
438         ENTRY;
439
440         /* initialize pool hash table */
441         qmt->qmt_pool_hash = cfs_hash_create("POOL_HASH",
442                                              HASH_POOLS_CUR_BITS,
443                                              HASH_POOLS_MAX_BITS,
444                                              HASH_POOLS_BKT_BITS, 0,
445                                              CFS_HASH_MIN_THETA,
446                                              CFS_HASH_MAX_THETA,
447                                              &qpi_hash_ops,
448                                              CFS_HASH_DEFAULT);
449         if (qmt->qmt_pool_hash == NULL) {
450                 CERROR("%s: failed to create pool hash table\n",
451                        qmt->qmt_svname);
452                 RETURN(-ENOMEM);
453         }
454
455         /* initialize pool list */
456         INIT_LIST_HEAD(&qmt->qmt_pool_list);
457
458         /* Instantiate pool master for the default data and metadata pool (both
459          * have pool ID equals to 0).
460          * This code will have to be revisited once we support quota on
461          * non-default pools */
462         for (res = LQUOTA_FIRST_RES; res < LQUOTA_LAST_RES; res++) {
463                 rc = qmt_pool_alloc(env, qmt, 0, res);
464                 if (rc)
465                         break;
466         }
467
468         if (rc)
469                 qmt_pool_fini(env, qmt);
470
471         RETURN(rc);
472 }
473
474 static int qmt_slv_cnt(const struct lu_env *env, struct lu_fid *glb_fid,
475                        char *slv_name, struct lu_fid *slv_fid, void *arg)
476 {
477         int *nr = arg;
478
479         /* one more slave */
480         (*nr)++;
481
482         return 0;
483 }
484
485 /*
486  * Set up on-disk index files associated with each pool.
487  *
488  * \param env - is the environment passed by the caller
489  * \param qmt - is the quota master target for which we have to initialize the
490  *              pool configuration
491  * \param qmt_root - is the on-disk directory created for the QMT.
492  *
493  * \retval - 0 on success, appropriate error on failure
494  */
495 int qmt_pool_prepare(const struct lu_env *env, struct qmt_device *qmt,
496                      struct dt_object *qmt_root)
497 {
498         struct qmt_thread_info  *qti = qmt_info(env);
499         struct lquota_glb_rec   *rec = &qti->qti_glb_rec;
500         struct qmt_pool_info    *pool;
501         struct dt_device        *dev = NULL;
502         dt_obj_version_t         version;
503         struct list_head        *pos;
504         int                      rc = 0, qtype;
505         ENTRY;
506
507         LASSERT(qmt->qmt_pool_hash != NULL);
508
509         /* iterate over each pool in the hash and allocate a quota site for each
510          * one. This involves creating a global index file on disk */
511         list_for_each(pos, &qmt->qmt_pool_list) {
512                 struct dt_object        *obj;
513                 int                      pool_type, pool_id;
514                 struct lquota_entry     *lqe;
515
516                 pool = list_entry(pos, struct qmt_pool_info,
517                                   qpi_linkage);
518
519                 pool_id   = pool->qpi_key & 0x0000ffff;
520                 pool_type = pool->qpi_key >> 16;
521                 if (dev == NULL)
522                         dev = pool->qpi_qmt->qmt_child;
523
524                 /* allocate directory for this pool */
525                 sprintf(qti->qti_buf, "%s-0x%x", RES_NAME(pool_type), pool_id);
526                 obj = lquota_disk_dir_find_create(env, qmt->qmt_child, qmt_root,
527                                                   qti->qti_buf);
528                 if (IS_ERR(obj))
529                         RETURN(PTR_ERR(obj));
530                 pool->qpi_root = obj;
531
532                 for (qtype = 0; qtype < LL_MAXQUOTAS; qtype++) {
533                         /* Generating FID of global index in charge of storing
534                          * settings for this quota type */
535                         lquota_generate_fid(&qti->qti_fid, pool_id, pool_type,
536                                             qtype);
537
538                         /* open/create the global index file for this quota
539                          * type */
540                         obj = lquota_disk_glb_find_create(env, dev,
541                                                           pool->qpi_root,
542                                                           &qti->qti_fid, false);
543                         if (IS_ERR(obj)) {
544                                 rc = PTR_ERR(obj);
545                                 CERROR("%s: failed to create glb index copy for %s type: rc = %d\n",
546                                        qmt->qmt_svname, qtype_name(qtype), rc);
547                                 RETURN(rc);
548                         }
549
550                         pool->qpi_glb_obj[qtype] = obj;
551
552                         version = dt_version_get(env, obj);
553                         /* set default grace time for newly created index */
554                         if (version == 0) {
555                                 rec->qbr_hardlimit = 0;
556                                 rec->qbr_softlimit = 0;
557                                 rec->qbr_granted = 0;
558                                 rec->qbr_time = pool_type == LQUOTA_RES_MD ?
559                                         MAX_IQ_TIME : MAX_DQ_TIME;
560
561                                 rc = lquota_disk_write_glb(env, obj, 0, rec);
562                                 if (rc) {
563                                         CERROR("%s: failed to set default grace time for %s type: rc = %d\n",
564                                                qmt->qmt_svname, qtype_name(qtype), rc);
565                                         RETURN(rc);
566                                 }
567
568                                 rc = lquota_disk_update_ver(env, dev, obj, 1);
569                                 if (rc) {
570                                         CERROR("%s: failed to set initial version for %s type: rc = %d\n",
571                                                qmt->qmt_svname, qtype_name(qtype), rc);
572                                         RETURN(rc);
573                                 }
574                         }
575
576                         /* create quota entry site for this quota type */
577                         pool->qpi_site[qtype] = lquota_site_alloc(env, pool,
578                                                                   true, qtype,
579                                                                   &qmt_lqe_ops);
580                         if (IS_ERR(pool->qpi_site[qtype])) {
581                                 rc = PTR_ERR(pool->qpi_site[qtype]);
582                                 CERROR("%s: failed to create site for %s type: rc = %d\n",
583                                        qmt->qmt_svname, qtype_name(qtype), rc);
584                                 RETURN(rc);
585                         }
586
587                         /* count number of slaves which already connected to
588                          * the master in the past */
589                         pool->qpi_slv_nr[qtype] = 0;
590                         rc = lquota_disk_for_each_slv(env, pool->qpi_root,
591                                                       &qti->qti_fid,
592                                                       qmt_slv_cnt,
593                                                       &pool->qpi_slv_nr[qtype]);
594                         if (rc) {
595                                 CERROR("%s: failed to scan & count slave indexes for %s type: rc = %d\n",
596                                        qmt->qmt_svname, qtype_name(qtype), rc);
597                                 RETURN(rc);
598                         }
599
600                         /* Global grace time is stored in quota settings of
601                          * ID 0. */
602                         qti->qti_id.qid_uid = 0;
603
604                         /* look-up quota entry storing grace time */
605                         lqe = lqe_locate(env, pool->qpi_site[qtype],
606                                          &qti->qti_id);
607                         if (IS_ERR(lqe))
608                                 RETURN(PTR_ERR(lqe));
609                         pool->qpi_grace_lqe[qtype] = lqe;
610 #ifdef CONFIG_PROC_FS
611                         /* add procfs file to dump the global index, mostly for
612                          * debugging purpose */
613                         snprintf(qti->qti_buf, MTI_NAME_MAXLEN,
614                                  "glb-%s", qtype_name(qtype));
615                         rc = lprocfs_seq_create(pool->qpi_proc, qti->qti_buf,
616                                                 0444, &lprocfs_quota_seq_fops,
617                                                 obj);
618                         if (rc)
619                                 CWARN("%s: Error adding procfs file for global"
620                                       "quota index "DFID", rc:%d\n",
621                                       qmt->qmt_svname, PFID(&qti->qti_fid), rc);
622 #endif
623                 }
624         }
625
626         RETURN(0);
627 }
628
629 /*
630  * Handle new slave connection. Called when a slave enqueues the global quota
631  * lock at the beginning of the reintegration procedure.
632  *
633  * \param env - is the environment passed by the caller
634  * \parap qmt - is the quota master target handling this request
635  * \param glb_fid - is the fid of the global index file
636  * \param slv_fid - is the fid of the newly created slave index file
637  * \param slv_ver - is the current version of the slave index file
638  * \param uuid    - is the uuid of slave which is (re)connecting to the master
639  *                  target
640  *
641  * \retval - 0 on success, appropriate error on failure
642  */
643 int qmt_pool_new_conn(const struct lu_env *env, struct qmt_device *qmt,
644                       struct lu_fid *glb_fid, struct lu_fid *slv_fid,
645                       __u64 *slv_ver, struct obd_uuid *uuid)
646 {
647         struct qmt_pool_info    *pool;
648         struct dt_object        *slv_obj;
649         int                      pool_id, pool_type, qtype;
650         bool                     created = false;
651         int                      rc = 0;
652
653         /* extract pool info from global index FID */
654         rc = lquota_extract_fid(glb_fid, &pool_id, &pool_type, &qtype);
655         if (rc)
656                 RETURN(rc);
657
658         /* look-up pool in charge of this global index FID */
659         pool = qmt_pool_lookup(env, qmt, pool_id, pool_type);
660         if (IS_ERR(pool))
661                 RETURN(PTR_ERR(pool));
662
663         /* look-up slave index file */
664         slv_obj = lquota_disk_slv_find(env, qmt->qmt_child, pool->qpi_root,
665                                        glb_fid, uuid);
666         if (IS_ERR(slv_obj) && PTR_ERR(slv_obj) == -ENOENT) {
667                 /* create slave index file */
668                 slv_obj = lquota_disk_slv_find_create(env, qmt->qmt_child,
669                                                       pool->qpi_root, glb_fid,
670                                                       uuid, false);
671                 created = true;
672         }
673         if (IS_ERR(slv_obj)) {
674                 rc = PTR_ERR(slv_obj);
675                 CERROR("%s: failed to create quota slave index file for %s (%d)"
676                        "\n", qmt->qmt_svname, obd_uuid2str(uuid), rc);
677                 GOTO(out, rc);
678         }
679
680         /* retrieve slave fid & current object version */
681         memcpy(slv_fid, lu_object_fid(&slv_obj->do_lu), sizeof(*slv_fid));
682         *slv_ver = dt_version_get(env, slv_obj);
683         dt_object_put(env, slv_obj);
684         if (created)
685                 pool->qpi_slv_nr[qtype]++;
686 out:
687         qpi_putref(env, pool);
688         RETURN(rc);
689 }
690
691 /*
692  * Look-up a lquota_entry in the pool hash and allocate it if not found.
693  *
694  * \param env - is the environment passed by the caller
695  * \param qmt - is the quota master target for which we have to initialize the
696  *              pool configuration
697  * \param pool_id   - is the 16-bit identifier of the pool
698  * \param pool_type - is the pool type, either LQUOTA_RES_MD or LQUOTA_RES_DT.
699  * \param qtype     - is the quota type, either user or group.
700  * \param qid       - is the quota ID to look-up
701  *
702  * \retval - valid pointer to lquota entry on success, appropriate error on
703  *           failure
704  */
705 struct lquota_entry *qmt_pool_lqe_lookup(const struct lu_env *env,
706                                          struct qmt_device *qmt,
707                                          int pool_id, int pool_type,
708                                          int qtype, union lquota_id *qid)
709 {
710         struct qmt_pool_info    *pool;
711         struct lquota_entry     *lqe;
712         ENTRY;
713
714         /* look-up pool responsible for this global index FID */
715         pool = qmt_pool_lookup(env, qmt, pool_id, pool_type);
716         if (IS_ERR(pool))
717                 RETURN((void *)pool);
718
719         if (qid->qid_uid == 0) {
720                 /* caller wants to access grace time, no need to look up the
721                  * entry since we keep a reference on ID 0 all the time */
722                 lqe = pool->qpi_grace_lqe[qtype];
723                 lqe_getref(lqe);
724                 GOTO(out, lqe);
725         }
726
727         /* now that we have the pool, let's look-up the quota entry in the
728          * right quota site */
729         lqe = lqe_locate(env, pool->qpi_site[qtype], qid);
730 out:
731         qpi_putref(env, pool);
732         RETURN(lqe);
733 }