Whamcloud - gitweb
LU-4017 quota: cleanup codes of quota for new type
[fs/lustre-release.git] / lustre / osd-zfs / osd_quota.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 021110-1307, USA
20  *
21  * GPL HEADER END
22  */
23 /*
24  * Copyright (c) 2012, 2015, Intel Corporation.
25  * Use is subject to license terms.
26  *
27  * Author: Johann Lombardi <johann@whamcloud.com>
28  */
29
30 #include <lustre_quota.h>
31 #include <obd.h>
32 #include "osd_internal.h"
33
34 /**
35  * Helper function to retrieve DMU object id from fid for accounting object
36  */
37 inline int osd_quota_fid2dmu(const struct lu_fid *fid, uint64_t *oid)
38 {
39         int rc = 0;
40
41         LASSERT(fid_is_acct(fid));
42         switch (fid_oid(fid)) {
43         case ACCT_GROUP_OID:
44                 *oid = DMU_GROUPUSED_OBJECT;
45                 break;
46         case ACCT_USER_OID:
47                 *oid = DMU_USERUSED_OBJECT;
48                 break;
49         default:
50                 rc = -EINVAL;
51                 break;
52         }
53         return rc;
54 }
55
56 /**
57  * Helper function to estimate the number of inodes in use for a give uid/gid
58  * from the block usage
59  */
60 static uint64_t osd_objset_user_iused(struct osd_device *osd, uint64_t uidbytes)
61 {
62         uint64_t refdbytes, availbytes, usedobjs, availobjs;
63         uint64_t uidobjs, bshift;
64
65         /* get fresh statfs info */
66         dmu_objset_space(osd->od_os, &refdbytes, &availbytes,
67                          &usedobjs, &availobjs);
68
69         /* estimate the number of objects based on the disk usage */
70         bshift = fls64(osd->od_max_blksz) - 1;
71         uidobjs = osd_objs_count_estimate(refdbytes, usedobjs,
72                                           uidbytes >> bshift, bshift);
73         if (uidbytes > 0)
74                 /* if we have at least 1 byte, we have at least one dnode ... */
75                 uidobjs = max_t(uint64_t, uidobjs, 1);
76
77         return uidobjs;
78 }
79
80 /**
81  * Space Accounting Management
82  */
83
84 /**
85  * Return space usage consumed by a given uid or gid.
86  * Block usage is accurrate since it is maintained by DMU itself.
87  * However, DMU does not provide inode accounting, so the #inodes in use
88  * is estimated from the block usage and statfs information.
89  *
90  * \param env   - is the environment passed by the caller
91  * \param dtobj - is the accounting object
92  * \param dtrec - is the record to fill with space usage information
93  * \param dtkey - is the id the of the user or group for which we would
94  *                like to access disk usage.
95  *
96  * \retval +ve - success : exact match
97  * \retval -ve - failure
98  */
99 static int osd_acct_index_lookup(const struct lu_env *env,
100                                 struct dt_object *dtobj,
101                                 struct dt_rec *dtrec,
102                                 const struct dt_key *dtkey)
103 {
104         struct osd_thread_info  *info = osd_oti_get(env);
105         char                    *buf  = info->oti_buf;
106         size_t                   buflen = sizeof(info->oti_buf);
107         struct lquota_acct_rec  *rec  = (struct lquota_acct_rec *)dtrec;
108         struct osd_object       *obj = osd_dt_obj(dtobj);
109         struct osd_device       *osd = osd_obj2dev(obj);
110         int                      rc;
111         uint64_t                 oid = 0;
112         ENTRY;
113
114         rec->bspace = rec->ispace = 0;
115
116         /* convert the 64-bit uid/gid into a string */
117         snprintf(buf, buflen, "%llx", *((__u64 *)dtkey));
118         /* fetch DMU object ID (DMU_USERUSED_OBJECT/DMU_GROUPUSED_OBJECT) to be
119          * used */
120         rc = osd_quota_fid2dmu(lu_object_fid(&dtobj->do_lu), &oid);
121         if (rc)
122                 RETURN(rc);
123
124         /* disk usage (in bytes) is maintained by DMU.
125          * DMU_USERUSED_OBJECT/DMU_GROUPUSED_OBJECT are special objects which
126          * not associated with any dmu_but_t (see dnode_special_open()). */
127         rc = zap_lookup(osd->od_os, oid, buf, sizeof(uint64_t), 1,
128                         &rec->bspace);
129         if (rc == -ENOENT) {
130                 /* user/group has not created anything yet */
131                 CDEBUG(D_QUOTA, "%s: id %s not found in DMU accounting ZAP\n",
132                        osd->od_svname, buf);
133         } else if (rc) {
134                 RETURN(rc);
135         }
136
137         if (!osd_dmu_userobj_accounting_available(osd)) {
138                 if (rec->bspace != 0)
139                         /* estimate #inodes in use */
140                         rec->ispace = osd_objset_user_iused(osd, rec->bspace);
141                 rc = 1;
142         } else {
143                 snprintf(buf, buflen, OSD_DMU_USEROBJ_PREFIX "%llx",
144                          *((__u64 *)dtkey));
145                 rc = zap_lookup(osd->od_os, oid, buf, sizeof(uint64_t), 1,
146                                 &rec->ispace);
147                 if (rc == -ENOENT) {
148                         CDEBUG(D_QUOTA,
149                                "%s: id %s not found dnode accounting\n",
150                                osd->od_svname, buf);
151                 } else if (rc == 0) {
152                         rc = 1;
153                 }
154         }
155
156         RETURN(rc);
157 }
158
159 /**
160  * Initialize osd Iterator for given osd index object.
161  *
162  * \param  dt    - osd index object
163  * \param  attr  - not used
164  */
165 static struct dt_it *osd_it_acct_init(const struct lu_env *env,
166                                       struct dt_object *dt,
167                                       __u32 attr)
168 {
169         struct osd_thread_info  *info = osd_oti_get(env);
170         struct osd_it_quota     *it;
171         struct lu_object        *lo   = &dt->do_lu;
172         struct osd_device       *osd  = osd_dev(lo->lo_dev);
173         int                      rc;
174         ENTRY;
175
176         LASSERT(lu_object_exists(lo));
177
178         if (info == NULL)
179                 RETURN(ERR_PTR(-ENOMEM));
180
181         OBD_ALLOC_PTR(it);
182         if (it == NULL)
183                 RETURN(ERR_PTR(-ENOMEM));
184
185         memset(it, 0, sizeof(*it));
186         rc = osd_quota_fid2dmu(lu_object_fid(lo), &it->oiq_oid);
187         if (rc)
188                 RETURN(ERR_PTR(rc));
189
190         /* initialize zap cursor */
191         rc = osd_zap_cursor_init(&it->oiq_zc, osd->od_os, it->oiq_oid, 0);
192         if (rc != 0) {
193                 OBD_FREE_PTR(it);
194                 RETURN(ERR_PTR(rc));
195         }
196
197         /* take object reference */
198         lu_object_get(lo);
199         it->oiq_obj   = osd_dt_obj(dt);
200         it->oiq_reset = 1;
201
202         RETURN((struct dt_it *)it);
203 }
204
205 /**
206  * Free given iterator.
207  *
208  * \param  di   - osd iterator
209  */
210 static void osd_it_acct_fini(const struct lu_env *env, struct dt_it *di)
211 {
212         struct osd_it_quota     *it     = (struct osd_it_quota *)di;
213         ENTRY;
214
215         osd_zap_cursor_fini(it->oiq_zc);
216         osd_object_put(env, it->oiq_obj);
217         OBD_FREE_PTR(it);
218
219         EXIT;
220 }
221
222 /**
223  * Move on to the next valid entry.
224  *
225  * \param  di   - osd iterator
226  *
227  * \retval +ve  - iterator reached the end
228  * \retval   0  - iterator has not reached the end yet
229  * \retval -ve  - unexpected failure
230  */
231 static int osd_it_acct_next(const struct lu_env *env, struct dt_it *di)
232 {
233         struct osd_it_quota     *it = (struct osd_it_quota *)di;
234         zap_attribute_t         *za = &osd_oti_get(env)->oti_za;
235         int                      rc;
236         ENTRY;
237
238         if (it->oiq_reset == 0)
239                 zap_cursor_advance(it->oiq_zc);
240         it->oiq_reset = 0;
241         rc = -zap_cursor_retrieve(it->oiq_zc, za);
242         if (rc == -ENOENT) /* reached the end */
243                 rc = 1;
244         RETURN(rc);
245 }
246
247 /**
248  * Return pointer to the key under iterator.
249  *
250  * \param  di   - osd iterator
251  */
252 static struct dt_key *osd_it_acct_key(const struct lu_env *env,
253                                       const struct dt_it *di)
254 {
255         struct osd_it_quota     *it = (struct osd_it_quota *)di;
256         zap_attribute_t         *za = &osd_oti_get(env)->oti_za;
257         int                      rc;
258         ENTRY;
259
260         it->oiq_reset = 0;
261         rc = -zap_cursor_retrieve(it->oiq_zc, za);
262         if (rc)
263                 RETURN(ERR_PTR(rc));
264         rc = kstrtoull(za->za_name, 16, &it->oiq_id);
265
266         RETURN((struct dt_key *) &it->oiq_id);
267 }
268
269 /**
270  * Return size of key under iterator (in bytes)
271  *
272  * \param  di   - osd iterator
273  */
274 static int osd_it_acct_key_size(const struct lu_env *env,
275                                 const struct dt_it *di)
276 {
277         ENTRY;
278         RETURN((int)sizeof(uint64_t));
279 }
280
281 /*
282  * zap_cursor_retrieve read from current record.
283  * to read bytes we need to call zap_lookup explicitly.
284  */
285 static int osd_zap_cursor_retrieve_value(const struct lu_env *env,
286                                          zap_cursor_t *zc,  char *buf,
287                                          int buf_size, int *bytes_read)
288 {
289         zap_attribute_t *za = &osd_oti_get(env)->oti_za;
290         int rc, actual_size;
291
292         rc = -zap_cursor_retrieve(zc, za);
293         if (unlikely(rc != 0))
294                 return -rc;
295
296         if (unlikely(za->za_integer_length <= 0))
297                 return -ERANGE;
298
299         actual_size = za->za_integer_length * za->za_num_integers;
300
301         if (actual_size > buf_size) {
302                 actual_size = buf_size;
303                 buf_size = actual_size / za->za_integer_length;
304         } else {
305                 buf_size = za->za_num_integers;
306         }
307
308         rc = -zap_lookup(zc->zc_objset, zc->zc_zapobj,
309                          za->za_name, za->za_integer_length,
310                          buf_size, buf);
311
312         if (likely(rc == 0))
313                 *bytes_read = actual_size;
314
315         return rc;
316 }
317
318 /**
319  * Return pointer to the record under iterator.
320  *
321  * \param  di    - osd iterator
322  * \param  attr  - not used
323  */
324 static int osd_it_acct_rec(const struct lu_env *env,
325                            const struct dt_it *di,
326                            struct dt_rec *dtrec, __u32 attr)
327 {
328         struct osd_thread_info  *info = osd_oti_get(env);
329         zap_attribute_t         *za = &info->oti_za;
330         struct osd_it_quota     *it = (struct osd_it_quota *)di;
331         struct lquota_acct_rec  *rec  = (struct lquota_acct_rec *)dtrec;
332         struct osd_object       *obj = it->oiq_obj;
333         struct osd_device       *osd = osd_obj2dev(obj);
334         int                      bytes_read;
335         int                      rc;
336         ENTRY;
337
338         it->oiq_reset = 0;
339         rec->ispace = rec->bspace = 0;
340
341         /* retrieve block usage from the DMU accounting object */
342         rc = osd_zap_cursor_retrieve_value(env, it->oiq_zc,
343                                            (char *)&rec->bspace,
344                                            sizeof(uint64_t), &bytes_read);
345         if (rc)
346                 RETURN(rc);
347
348         if (!osd_dmu_userobj_accounting_available(osd)) {
349                 if (rec->bspace != 0)
350                         /* estimate #inodes in use */
351                         rec->ispace = osd_objset_user_iused(osd, rec->bspace);
352                 RETURN(0);
353         }
354
355         /* retrieve key associated with the current cursor */
356         rc = -zap_cursor_retrieve(it->oiq_zc, za);
357         if (unlikely(rc != 0))
358                 RETURN(rc);
359
360         /* inode accounting is not maintained by DMU, so we use our own ZAP to
361          * track inode usage */
362         rc = -zap_lookup(osd->od_os, it->oiq_obj->oo_dn->dn_object,
363                          za->za_name, sizeof(uint64_t), 1, &rec->ispace);
364         if (rc == -ENOENT)
365                 /* user/group has not created any file yet */
366                 CDEBUG(D_QUOTA, "%s: id %s not found in accounting ZAP\n",
367                        osd->od_svname, za->za_name);
368         else if (rc)
369                 RETURN(rc);
370
371         RETURN(0);
372 }
373
374 /**
375  * Returns cookie for current Iterator position.
376  *
377  * \param  di    - osd iterator
378  */
379 static __u64 osd_it_acct_store(const struct lu_env *env,
380                                const struct dt_it *di)
381 {
382         struct osd_it_quota *it = (struct osd_it_quota *)di;
383         ENTRY;
384         it->oiq_reset = 0;
385         RETURN(osd_zap_cursor_serialize(it->oiq_zc));
386 }
387
388 /**
389  * Restore iterator from cookie. if the \a hash isn't found,
390  * restore the first valid record.
391  *
392  * \param  di    - osd iterator
393  * \param  hash  - iterator location cookie
394  *
395  * \retval +ve  - di points to exact matched key
396  * \retval  0   - di points to the first valid record
397  * \retval -ve  - failure
398  */
399 static int osd_it_acct_load(const struct lu_env *env,
400                             const struct dt_it *di, __u64 hash)
401 {
402         struct osd_it_quota     *it  = (struct osd_it_quota *)di;
403         struct osd_device       *osd = osd_obj2dev(it->oiq_obj);
404         zap_attribute_t         *za = &osd_oti_get(env)->oti_za;
405         zap_cursor_t            *zc;
406         int                      rc;
407         ENTRY;
408
409         /* create new cursor pointing to the new hash */
410         rc = osd_zap_cursor_init(&zc, osd->od_os, it->oiq_oid, hash);
411         if (rc)
412                 RETURN(rc);
413         osd_zap_cursor_fini(it->oiq_zc);
414         it->oiq_zc = zc;
415         it->oiq_reset = 0;
416
417         rc = -zap_cursor_retrieve(it->oiq_zc, za);
418         if (rc == 0)
419                 rc = 1;
420         else if (rc == -ENOENT)
421                 rc = 0;
422
423         RETURN(rc);
424 }
425
426 /**
427  * Move Iterator to record specified by \a key, if the \a key isn't found,
428  * move to the first valid record.
429  *
430  * \param  di   - osd iterator
431  * \param  key  - uid or gid
432  *
433  * \retval +ve  - di points to exact matched key
434  * \retval 0    - di points to the first valid record
435  * \retval -ve  - failure
436  */
437 static int osd_it_acct_get(const struct lu_env *env, struct dt_it *di,
438                 const struct dt_key *key)
439 {
440         ENTRY;
441
442         /* XXX: like osd_zap_it_get(), API is currently broken */
443         LASSERT(*((__u64 *)key) == 0);
444
445         RETURN(osd_it_acct_load(env, di, 0));
446 }
447
448 /**
449  * Release Iterator
450  *
451  * \param  di   - osd iterator
452  */
453 static void osd_it_acct_put(const struct lu_env *env, struct dt_it *di)
454 {
455 }
456
457 /**
458  * Index and Iterator operations for accounting objects
459  */
460 const struct dt_index_operations osd_acct_index_ops = {
461         .dio_lookup = osd_acct_index_lookup,
462         .dio_it     = {
463                 .init           = osd_it_acct_init,
464                 .fini           = osd_it_acct_fini,
465                 .get            = osd_it_acct_get,
466                 .put            = osd_it_acct_put,
467                 .next           = osd_it_acct_next,
468                 .key            = osd_it_acct_key,
469                 .key_size       = osd_it_acct_key_size,
470                 .rec            = osd_it_acct_rec,
471                 .store          = osd_it_acct_store,
472                 .load           = osd_it_acct_load
473         }
474 };
475
476 /**
477  * Quota Enforcement Management
478  */
479
480 /*
481  * Wrapper for qsd_op_begin().
482  *
483  * \param env    - the environment passed by the caller
484  * \param osd    - is the osd_device
485  * \param uid    - user id of the inode
486  * \param gid    - group id of the inode
487  * \param space  - how many blocks/inodes will be consumed/released
488  * \param oh     - osd transaction handle
489  * \param is_blk - block quota or inode quota?
490  * \param flags  - if the operation is write, return no user quota, no
491  *                  group quota, or sync commit flags to the caller
492  * \param force  - set to 1 when changes are performed by root user and thus
493  *                  can't failed with EDQUOT
494  *
495  * \retval 0      - success
496  * \retval -ve    - failure
497  */
498 int osd_declare_quota(const struct lu_env *env, struct osd_device *osd,
499                       qid_t uid, qid_t gid, long long space,
500                       struct osd_thandle *oh, bool is_blk, int *flags,
501                       bool force)
502 {
503         struct osd_thread_info  *info = osd_oti_get(env);
504         struct lquota_id_info   *qi = &info->oti_qi;
505         struct qsd_instance     *qsd = osd->od_quota_slave;
506         int                      rcu, rcg; /* user & group rc */
507         ENTRY;
508
509         if (unlikely(qsd == NULL))
510                 /* quota slave instance hasn't been allocated yet */
511                 RETURN(0);
512
513         /* let's start with user quota */
514         qi->lqi_id.qid_uid = uid;
515         qi->lqi_type       = USRQUOTA;
516         qi->lqi_space      = space;
517         qi->lqi_is_blk     = is_blk;
518         rcu = qsd_op_begin(env, qsd, &oh->ot_quota_trans, qi, flags);
519
520         if (force && (rcu == -EDQUOT || rcu == -EINPROGRESS))
521                 /* ignore EDQUOT & EINPROGRESS when changes are done by root */
522                 rcu = 0;
523
524         /* For non-fatal error, we want to continue to get the noquota flags
525          * for group id. This is only for commit write, which has @flags passed
526          * in. See osd_declare_write_commit().
527          * When force is set to true, we also want to proceed with the gid */
528         if (rcu && (rcu != -EDQUOT || flags == NULL))
529                 RETURN(rcu);
530
531         /* and now group quota */
532         qi->lqi_id.qid_gid = gid;
533         qi->lqi_type       = GRPQUOTA;
534         rcg = qsd_op_begin(env, qsd, &oh->ot_quota_trans, qi, flags);
535
536         if (force && (rcg == -EDQUOT || rcg == -EINPROGRESS))
537                 /* as before, ignore EDQUOT & EINPROGRESS for root */
538                 rcg = 0;
539
540         RETURN(rcu ? rcu : rcg);
541 }