Whamcloud - gitweb
LU-17705 ptlrpc: replace synchronize_rcu() with rcu_barrier()
[fs/lustre-release.git] / lustre / quota / lquota_lib.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 021110-1307, USA
20  *
21  * GPL HEADER END
22  */
23 /*
24  * Copyright (c) 2012, 2017, Intel Corporation.
25  * Use is subject to license terms.
26  *
27  * Author: Johann Lombardi <johann.lombardi@intel.com>
28  * Author: Niu    Yawei    <yawei.niu@intel.com>
29  */
30
31 #define DEBUG_SUBSYSTEM S_LQUOTA
32
33 #include <linux/version.h>
34 #include <linux/module.h>
35 #include <linux/init.h>
36
37 #include "lquota_internal.h"
38
39 struct kmem_cache *lqe_kmem;
40
41 struct lu_kmem_descr lquota_caches[] = {
42         {
43                 .ckd_cache = &lqe_kmem,
44                 .ckd_name  = "lqe_kmem",
45                 .ckd_size  = sizeof(struct lquota_entry)
46         },
47         {
48                 .ckd_cache = NULL
49         }
50 };
51
52 /* register lquota key */
53 LU_KEY_INIT_FINI(lquota, struct lquota_thread_info);
54 LU_CONTEXT_KEY_DEFINE(lquota, LCT_MD_THREAD | LCT_DT_THREAD | LCT_LOCAL);
55 LU_KEY_INIT_GENERIC(lquota);
56
57 static inline __u32 qtype2acct_oid(int qtype)
58 {
59         switch (qtype) {
60         case USRQUOTA:
61                 return ACCT_USER_OID;
62         case GRPQUOTA:
63                 return ACCT_GROUP_OID;
64         case PRJQUOTA:
65                 return ACCT_PROJECT_OID;
66         }
67
68         return ACCT_GROUP_OID;
69 }
70
71 /**
72  * Look-up accounting object to collect space usage information for user
73  * or group.
74  *
75  * \param env  - is the environment passed by the caller
76  * \param dev  - is the dt_device storing the accounting object
77  * \param type - is the quota type, either USRQUOTA or GRPQUOTA
78  */
79 struct dt_object *acct_obj_lookup(const struct lu_env *env,
80                                   struct dt_device *dev, int type)
81 {
82         struct lquota_thread_info       *qti = lquota_info(env);
83         struct dt_object                *obj = NULL;
84         ENTRY;
85
86         lu_local_obj_fid(&qti->qti_fid, qtype2acct_oid(type));
87
88         /* lookup the accounting object */
89         obj = dt_locate(env, dev, &qti->qti_fid);
90         if (IS_ERR(obj))
91                 RETURN(obj);
92
93         if (!dt_object_exists(obj)) {
94                 dt_object_put(env, obj);
95                 RETURN(ERR_PTR(-ENOENT));
96         }
97
98         if (obj->do_index_ops == NULL) {
99                 int rc;
100
101                 /* set up indexing operations */
102                 rc = obj->do_ops->do_index_try(env, obj, &dt_acct_features);
103                 if (rc) {
104                         CERROR("%s: failed to set up indexing operations for %s"
105                                " acct object rc:%d\n",
106                                dev->dd_lu_dev.ld_obd->obd_name,
107                                qtype_name(type), rc);
108                         dt_object_put(env, obj);
109                         RETURN(ERR_PTR(rc));
110                 }
111         }
112         RETURN(obj);
113 }
114
115 /**
116  * Initialize slave index object to collect local quota limit for user or group.
117  *
118  * \param env - is the environment passed by the caller
119  * \param dev - is the dt_device storing the slave index object
120  * \param pool - is the pool type, either LQUOTA_RES_MD or LQUOTA_RES_DT
121  * \param type - is the quota type, either USRQUOTA or GRPQUOTA
122  */
123 static struct dt_object *quota_obj_lookup(const struct lu_env *env,
124                                           struct dt_device *dev, int pool,
125                                           int type)
126 {
127         struct lquota_thread_info       *qti = lquota_info(env);
128         struct dt_object                *obj = NULL;
129         int                              is_md;
130         ENTRY;
131
132         is_md = lu_device_is_md(dev->dd_lu_dev.ld_site->ls_top_dev);
133         if ((is_md && pool == LQUOTA_RES_MD) ||
134             (!is_md && pool == LQUOTA_RES_DT))
135                 qti->qti_fid.f_oid = qtype2slv_oid(type);
136         else
137                 qti->qti_fid.f_oid = pool << 16 | qtype2slv_oid(type);
138
139         qti->qti_fid.f_seq = FID_SEQ_QUOTA;
140         qti->qti_fid.f_ver = 0;
141
142         /* lookup the quota object */
143         obj = dt_locate(env, dev, &qti->qti_fid);
144         if (IS_ERR(obj))
145                 RETURN(obj);
146
147         if (!dt_object_exists(obj)) {
148                 dt_object_put(env, obj);
149                 RETURN(ERR_PTR(-ENOENT));
150         }
151
152         if (obj->do_index_ops == NULL) {
153                 int rc;
154
155                 /* set up indexing operations */
156                 rc = obj->do_ops->do_index_try(env, obj,
157                                                &dt_quota_slv_features);
158                 if (rc) {
159                         CERROR("%s: failed to set up indexing operations for %s"
160                                " slave index object rc:%d\n",
161                                dev->dd_lu_dev.ld_obd->obd_name,
162                                qtype_name(type), rc);
163                         dt_object_put(env, obj);
164                         RETURN(ERR_PTR(rc));
165                 }
166         }
167         RETURN(obj);
168 }
169
170 /*
171  * Iterate quota settings managed by \a obj.
172  *
173  * \param env     - is the environment passed by the caller
174  * \param dev     - is the backend device holding the quota object
175  * \param obj     - is the quota object to be iterated
176  * \param oqctl   - is the quota ioctl object passed in by caller
177  * \param buf     - is the buffer to save the retrieved quota settings
178  * \param size    - is the size of the buffer
179  * \param is_glb  - true to iterate the global quota settings
180  * \param is_md   - true to iterate LQUOTA_MD quota settings
181  */
182 int lquota_obj_iter(const struct lu_env *env, struct dt_device *dev,
183                     struct dt_object *obj, struct obd_quotactl *oqctl,
184                     char *buf, int size, bool is_glb, bool is_md)
185 {
186         struct lquota_thread_info *qti = lquota_info(env);
187         const struct dt_it_ops *iops;
188         struct dt_it *it;
189         struct dt_key *key;
190         struct dt_rec *rec = (struct dt_rec *)&qti->qti_rec;
191         __u64 offset;
192         bool skip = true;
193         int cur = 0, rc;
194         int rec_size;
195
196         ENTRY;
197
198         iops = &obj->do_index_ops->dio_it;
199         it = iops->init(env, obj, 0);
200         if (IS_ERR(it)) {
201                 rc = PTR_ERR(it);
202                 CERROR("%s: failed to initialize iterator: rc = %ld\n",
203                        obj->do_lu.lo_dev->ld_obd->obd_name, PTR_ERR(it));
204                 RETURN(rc);
205         }
206
207         rc = iops->load(env, it, 0);
208         if (rc <= 0) {
209                 if (is_md)
210                         oqctl->qc_iter_md_offset = 0;
211                 else
212                         oqctl->qc_iter_dt_offset = 0;
213
214                 GOTO(out_fini, rc);
215         }
216
217         if ((is_md && oqctl->qc_iter_md_offset == 0) ||
218             (!is_md && oqctl->qc_iter_dt_offset == 0))
219                 skip = false;
220
221         if (is_glb)
222                 rec_size = sizeof(struct lquota_glb_rec);
223         else
224                 rec_size = sizeof(struct lquota_acct_rec);
225
226         if (is_md)
227                 offset = oqctl->qc_iter_md_offset;
228         else
229                 offset = oqctl->qc_iter_dt_offset;
230
231         while ((size - cur) > (sizeof(__u64) + rec_size)) {
232                 if (!skip)
233                         goto get_setting;
234
235                 if (offset == iops->store(env, it))
236                         skip = false;
237                 else {
238                         rc = iops->next(env, it);
239                         if (rc < 0) {
240                                 CERROR("%s: next failed: rc = %d\n",
241                                        obj->do_lu.lo_dev->ld_obd->obd_name, rc);
242                                 break;
243                         }
244
245                         /* reach the end */
246                         if (rc > 0) {
247                                 if (is_md)
248                                         oqctl->qc_iter_md_offset = 0;
249                                 else
250                                         oqctl->qc_iter_dt_offset = 0;
251
252                                 break;
253                         }
254
255                         continue;
256                 }
257
258 get_setting:
259                 key = iops->key(env, it);
260                 if (IS_ERR(key)) {
261                         CERROR("%s: failed to get key: rc = %ld\n",
262                                obj->do_lu.lo_dev->ld_obd->obd_name,
263                         PTR_ERR(key));
264
265                         GOTO(out_fini, rc = PTR_ERR(key));
266                 }
267
268                 rc = iops->rec(env, it, rec, 0);
269                 if (rc) {
270                         CERROR("%s: failed to get rec: rc = %d\n",
271                                obj->do_lu.lo_dev->ld_obd->obd_name, rc);
272                         GOTO(out_fini, rc);
273                 }
274
275                 if (oqctl->qc_iter_qid_end != 0 &&
276                     (*((__u64 *)key) < oqctl->qc_iter_qid_start ||
277                      *((__u64 *)key) > oqctl->qc_iter_qid_end))
278                         goto next;
279
280                 memcpy(buf + cur, key, sizeof(__u64));
281                 cur += sizeof(__u64);
282
283                 memcpy(buf + cur, rec, rec_size);
284                 cur += rec_size;
285
286 next:
287                 rc = iops->next(env, it);
288                 if (rc < 0) {
289                         CERROR("%s: next failed: rc = %d\n",
290                                obj->do_lu.lo_dev->ld_obd->obd_name, rc);
291
292                         GOTO(out_fini, rc);
293                 }
294
295                 /* reach the end */
296                 if (rc > 0) {
297                         if (is_md)
298                                 oqctl->qc_iter_md_offset = 0;
299                         else
300                                 oqctl->qc_iter_dt_offset = 0;
301
302                         break;
303                 }
304
305                 if (is_md)
306                         oqctl->qc_iter_md_offset = iops->store(env, it);
307                 else
308                         oqctl->qc_iter_dt_offset = iops->store(env, it);
309         }
310
311 out_fini:
312         if (rc >= 0) {
313                 if (is_md)
314                         oqctl->qc_iter_md_buflen = cur;
315                 else
316                         oqctl->qc_iter_dt_buflen = cur;
317
318                 rc = 0;
319         }
320
321         iops->put(env, it);
322         iops->fini(env, it);
323         return rc < 0 ? rc : 0;
324 }
325
326 /*
327  * Helper routine to retrieve slave information.
328  * This function converts a quotactl request into quota/accounting object
329  * operations. It is independant of the slave stack which is only accessible
330  * from the OSD layer.
331  *
332  * \param env   - is the environment passed by the caller
333  * \param dev   - is the dt_device this quotactl is executed on
334  * \param oqctl - is the quotactl request
335  */
336 int lquotactl_slv(const struct lu_env *env, struct dt_device *dev,
337                   struct obd_quotactl *oqctl, char *buffer, int size)
338 {
339         struct lquota_thread_info *qti = lquota_info(env);
340         __u64                            key;
341         struct dt_object                *obj, *obj_aux = NULL;
342         struct obd_dqblk                *dqblk = &oqctl->qc_dqblk;
343         int                              rc;
344         ENTRY;
345
346         if (oqctl->qc_cmd != Q_GETOQUOTA &&
347             oqctl->qc_cmd != LUSTRE_Q_ITEROQUOTA) {
348                 /* as in many other places, dev->dd_lu_dev.ld_obd->obd_name
349                  * point to an invalid obd_name, to be fixed in LU-1574 */
350                 CERROR("%s: Unsupported quotactl command: %x\n",
351                        dev->dd_lu_dev.ld_obd->obd_name, oqctl->qc_cmd);
352                 RETURN(-EOPNOTSUPP);
353         }
354
355         if (oqctl->qc_type < 0 || oqctl->qc_type >= LL_MAXQUOTAS)
356                 RETURN(-EOPNOTSUPP);
357
358         /* qc_id is a 32-bit field while a key has 64 bits */
359         key = oqctl->qc_id;
360
361         /* Step 1: collect accounting information */
362
363         obj = acct_obj_lookup(env, dev, oqctl->qc_type);
364         if (IS_ERR(obj))
365                 RETURN(-EOPNOTSUPP);
366         if (obj->do_index_ops == NULL)
367                 GOTO(out, rc = -EINVAL);
368
369         if (oqctl->qc_cmd == LUSTRE_Q_ITEROQUOTA) {
370                 if (lu_device_is_md(dev->dd_lu_dev.ld_site->ls_top_dev))
371                         rc = lquota_obj_iter(env, dev, obj, oqctl, buffer, size,
372                                          false, true);
373                 else
374                         rc = lquota_obj_iter(env, dev, obj, oqctl, buffer, size,
375                                          false, false);
376
377                 GOTO(out, rc);
378         }
379
380         /* lookup record storing space accounting information for this ID */
381         rc = dt_lookup(env, obj, (struct dt_rec *)&qti->qti_acct_rec,
382                        (struct dt_key *)&key);
383         if (rc < 0)
384                 GOTO(out, rc);
385
386         memset(&oqctl->qc_dqblk, 0, sizeof(struct obd_dqblk));
387         dqblk->dqb_curspace     = qti->qti_acct_rec.bspace;
388         dqblk->dqb_curinodes    = qti->qti_acct_rec.ispace;
389         dqblk->dqb_valid        = QIF_USAGE;
390
391         dt_object_put(env, obj);
392
393         /* Step 2: collect enforcement information */
394
395         if (lu_device_is_md(dev->dd_lu_dev.ld_site->ls_top_dev))
396                 obj = quota_obj_lookup(env, dev, LQUOTA_RES_MD, oqctl->qc_type);
397         else
398                 obj = quota_obj_lookup(env, dev, LQUOTA_RES_DT, oqctl->qc_type);
399
400         if (IS_ERR(obj))
401                 RETURN(0);
402         if (obj->do_index_ops == NULL)
403                 GOTO(out, rc = 0);
404
405         memset(&qti->qti_slv_rec, 0, sizeof(qti->qti_slv_rec));
406         /* lookup record storing enforcement information for this ID */
407         rc = dt_lookup(env, obj, (struct dt_rec *)&qti->qti_slv_rec,
408                        (struct dt_key *)&key);
409         if (rc < 0 && rc != -ENOENT)
410                 GOTO(out, rc = 0);
411
412         if (lu_device_is_md(dev->dd_lu_dev.ld_site->ls_top_dev)) {
413                 dqblk->dqb_ihardlimit = qti->qti_slv_rec.qsr_granted;
414                 dqblk->dqb_bhardlimit = 0;
415
416                 obj_aux = quota_obj_lookup(env, dev, LQUOTA_RES_DT,
417                                            oqctl->qc_type);
418                 if (IS_ERR(obj_aux)) {
419                         obj_aux = NULL;
420                         GOTO(out, rc = 0);
421                 }
422
423                 if (obj_aux->do_index_ops == NULL)
424                         GOTO(out, rc = 0);
425
426                 memset(&qti->qti_slv_rec, 0, sizeof(qti->qti_slv_rec));
427                 rc = dt_lookup(env, obj_aux, (struct dt_rec *)&qti->qti_slv_rec,
428                                (struct dt_key *)&key);
429                 if (rc < 0 && rc != -ENOENT)
430                         GOTO(out, rc = 0);
431
432                 dqblk->dqb_bhardlimit = qti->qti_slv_rec.qsr_granted;
433         } else {
434                 dqblk->dqb_ihardlimit = 0;
435                 dqblk->dqb_bhardlimit = qti->qti_slv_rec.qsr_granted;
436         }
437         dqblk->dqb_valid |= QIF_LIMITS;
438
439         GOTO(out, rc = 0);
440 out:
441         dt_object_put(env, obj);
442         if (obj_aux != NULL)
443                 dt_object_put(env, obj_aux);
444         return rc;
445 }
446 EXPORT_SYMBOL(lquotactl_slv);
447
448 static inline __u8 qtype2lqtype(int qtype)
449 {
450         switch (qtype) {
451         case USRQUOTA:
452                 return LQUOTA_TYPE_USR;
453         case GRPQUOTA:
454                 return LQUOTA_TYPE_GRP;
455         case PRJQUOTA:
456                 return LQUOTA_TYPE_PRJ;
457         }
458
459         return LQUOTA_TYPE_GRP;
460 }
461
462 static inline int lqtype2qtype(int lqtype)
463 {
464         switch (lqtype) {
465         case LQUOTA_TYPE_USR:
466                 return USRQUOTA;
467         case LQUOTA_TYPE_GRP:
468                 return GRPQUOTA;
469         case LQUOTA_TYPE_PRJ:
470                 return PRJQUOTA;
471         }
472
473         return GRPQUOTA;
474 }
475
476 /**
477  * Helper routine returning the FID associated with the global index storing
478  * quota settings for default storage pool, resource type \pool_type and
479  * the quota type \quota_type.
480  */
481 void lquota_generate_fid(struct lu_fid *fid, int pool_type, int quota_type)
482 {
483         __u8     lqtype = qtype2lqtype(quota_type);
484
485         fid->f_seq = FID_SEQ_QUOTA_GLB;
486         fid->f_oid = (lqtype << 24) | (pool_type << 16);
487         fid->f_ver = 0;
488 }
489
490 /**
491  * Helper routine used to extract pool type and quota type from a
492  * given FID.
493  */
494 int lquota_extract_fid(const struct lu_fid *fid, int *pool_type,
495                        int *quota_type)
496 {
497         unsigned int lqtype;
498         ENTRY;
499
500         if (fid->f_seq != FID_SEQ_QUOTA_GLB)
501                 RETURN(-EINVAL);
502
503         if (pool_type != NULL) {
504                 lqtype = (fid->f_oid >> 16) & 0xffU;
505                 if (lqtype >= LQUOTA_LAST_RES)
506                         RETURN(-ENOTSUPP);
507
508                 *pool_type = lqtype;
509         }
510
511         if (quota_type != NULL) {
512                 lqtype = fid->f_oid >> 24;
513                 if (lqtype >= LQUOTA_TYPE_MAX)
514                         RETURN(-ENOTSUPP);
515
516                 *quota_type = lqtype2qtype(lqtype);
517         }
518
519         RETURN(0);
520 }
521
522 static int __init lquota_init(void)
523 {
524         int     rc;
525         ENTRY;
526
527         rc = libcfs_setup();
528         if (rc)
529                 return rc;
530
531         lquota_key_init_generic(&lquota_thread_key, NULL);
532         lu_context_key_register(&lquota_thread_key);
533
534         rc = lu_kmem_init(lquota_caches);
535         if (rc)
536                 GOTO(out_key, rc);
537
538         rc = qmt_glb_init();
539         if (rc)
540                 GOTO(out_caches, rc);
541
542         rc = qsd_glb_init();
543         if (rc)
544                 GOTO(out_qmt, rc);
545
546         RETURN(0);
547
548 out_qmt:
549         qmt_glb_fini();
550 out_caches:
551         lu_kmem_fini(lquota_caches);
552 out_key:
553         lu_context_key_degister(&lquota_thread_key);
554         return rc;
555 }
556
557 static void __exit lquota_exit(void)
558 {
559         qsd_glb_fini();
560         qmt_glb_fini();
561         lu_kmem_fini(lquota_caches);
562         lu_context_key_degister(&lquota_thread_key);
563 }
564
565 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
566 MODULE_DESCRIPTION("Lustre Quota");
567 MODULE_VERSION(LUSTRE_VERSION_STRING);
568 MODULE_LICENSE("GPL");
569
570 module_init(lquota_init);
571 module_exit(lquota_exit);