Whamcloud - gitweb
LU-2435 osd-zfs: use zfs native dnode accounting
[fs/lustre-release.git] / lustre / osd-zfs / osd_quota.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 021110-1307, USA
20  *
21  * GPL HEADER END
22  */
23 /*
24  * Copyright (c) 2012, 2015, Intel Corporation.
25  * Use is subject to license terms.
26  *
27  * Author: Johann Lombardi <johann@whamcloud.com>
28  */
29
30 #include <lustre_quota.h>
31 #include <obd.h>
32 #include "osd_internal.h"
33
34 /**
35  * Helper function to retrieve DMU object id from fid for accounting object
36  */
37 uint64_t osd_quota_fid2dmu(const struct lu_fid *fid)
38 {
39         LASSERT(fid_is_acct(fid));
40         if (fid_oid(fid) == ACCT_GROUP_OID)
41                 return DMU_GROUPUSED_OBJECT;
42         return DMU_USERUSED_OBJECT;
43 }
44
45 /**
46  * Helper function to estimate the number of inodes in use for a give uid/gid
47  * from the block usage
48  */
49 static uint64_t osd_objset_user_iused(struct osd_device *osd, uint64_t uidbytes)
50 {
51         uint64_t refdbytes, availbytes, usedobjs, availobjs;
52         uint64_t uidobjs, bshift;
53
54         /* get fresh statfs info */
55         dmu_objset_space(osd->od_os, &refdbytes, &availbytes,
56                          &usedobjs, &availobjs);
57
58         /* estimate the number of objects based on the disk usage */
59         bshift = fls64(osd->od_max_blksz) - 1;
60         uidobjs = osd_objs_count_estimate(refdbytes, usedobjs,
61                                           uidbytes >> bshift, bshift);
62         if (uidbytes > 0)
63                 /* if we have at least 1 byte, we have at least one dnode ... */
64                 uidobjs = max_t(uint64_t, uidobjs, 1);
65
66         return uidobjs;
67 }
68
69 /**
70  * Space Accounting Management
71  */
72
73 /**
74  * Return space usage consumed by a given uid or gid.
75  * Block usage is accurrate since it is maintained by DMU itself.
76  * However, DMU does not provide inode accounting, so the #inodes in use
77  * is estimated from the block usage and statfs information.
78  *
79  * \param env   - is the environment passed by the caller
80  * \param dtobj - is the accounting object
81  * \param dtrec - is the record to fill with space usage information
82  * \param dtkey - is the id the of the user or group for which we would
83  *                like to access disk usage.
84  *
85  * \retval +ve - success : exact match
86  * \retval -ve - failure
87  */
88 static int osd_acct_index_lookup(const struct lu_env *env,
89                                 struct dt_object *dtobj,
90                                 struct dt_rec *dtrec,
91                                 const struct dt_key *dtkey)
92 {
93         struct osd_thread_info  *info = osd_oti_get(env);
94         char                    *buf  = info->oti_buf;
95         size_t                   buflen = sizeof(info->oti_buf);
96         struct lquota_acct_rec  *rec  = (struct lquota_acct_rec *)dtrec;
97         struct osd_object       *obj = osd_dt_obj(dtobj);
98         struct osd_device       *osd = osd_obj2dev(obj);
99         int                      rc;
100         uint64_t                 oid;
101         ENTRY;
102
103         rec->bspace = rec->ispace = 0;
104
105         /* convert the 64-bit uid/gid into a string */
106         snprintf(buf, buflen, "%llx", *((__u64 *)dtkey));
107         /* fetch DMU object ID (DMU_USERUSED_OBJECT/DMU_GROUPUSED_OBJECT) to be
108          * used */
109         oid = osd_quota_fid2dmu(lu_object_fid(&dtobj->do_lu));
110
111         /* disk usage (in bytes) is maintained by DMU.
112          * DMU_USERUSED_OBJECT/DMU_GROUPUSED_OBJECT are special objects which
113          * not associated with any dmu_but_t (see dnode_special_open()). */
114         rc = zap_lookup(osd->od_os, oid, buf, sizeof(uint64_t), 1,
115                         &rec->bspace);
116         if (rc == -ENOENT) {
117                 /* user/group has not created anything yet */
118                 CDEBUG(D_QUOTA, "%s: id %s not found in DMU accounting ZAP\n",
119                        osd->od_svname, buf);
120         } else if (rc) {
121                 RETURN(rc);
122         }
123
124         if (!osd_dmu_userobj_accounting_available(osd)) {
125                 if (rec->bspace != 0)
126                         /* estimate #inodes in use */
127                         rec->ispace = osd_objset_user_iused(osd, rec->bspace);
128                 rc = 1;
129         } else {
130                 snprintf(buf, buflen, OSD_DMU_USEROBJ_PREFIX "%llx",
131                          *((__u64 *)dtkey));
132                 rc = zap_lookup(osd->od_os, oid, buf, sizeof(uint64_t), 1,
133                                 &rec->ispace);
134                 if (rc == -ENOENT) {
135                         CDEBUG(D_QUOTA,
136                                "%s: id %s not found dnode accounting\n",
137                                osd->od_svname, buf);
138                 } else if (rc == 0) {
139                         rc = 1;
140                 }
141         }
142
143         RETURN(rc);
144 }
145
146 /**
147  * Initialize osd Iterator for given osd index object.
148  *
149  * \param  dt    - osd index object
150  * \param  attr  - not used
151  */
152 static struct dt_it *osd_it_acct_init(const struct lu_env *env,
153                                       struct dt_object *dt,
154                                       __u32 attr)
155 {
156         struct osd_thread_info  *info = osd_oti_get(env);
157         struct osd_it_quota     *it;
158         struct lu_object        *lo   = &dt->do_lu;
159         struct osd_device       *osd  = osd_dev(lo->lo_dev);
160         int                      rc;
161         ENTRY;
162
163         LASSERT(lu_object_exists(lo));
164
165         if (info == NULL)
166                 RETURN(ERR_PTR(-ENOMEM));
167
168         OBD_ALLOC_PTR(it);
169         if (it == NULL)
170                 RETURN(ERR_PTR(-ENOMEM));
171
172         memset(it, 0, sizeof(*it));
173         it->oiq_oid = osd_quota_fid2dmu(lu_object_fid(lo));
174
175         /* initialize zap cursor */
176         rc = osd_zap_cursor_init(&it->oiq_zc, osd->od_os, it->oiq_oid, 0);
177         if (rc != 0) {
178                 OBD_FREE_PTR(it);
179                 RETURN(ERR_PTR(rc));
180         }
181
182         /* take object reference */
183         lu_object_get(lo);
184         it->oiq_obj   = osd_dt_obj(dt);
185         it->oiq_reset = 1;
186
187         RETURN((struct dt_it *)it);
188 }
189
190 /**
191  * Free given iterator.
192  *
193  * \param  di   - osd iterator
194  */
195 static void osd_it_acct_fini(const struct lu_env *env, struct dt_it *di)
196 {
197         struct osd_it_quota     *it     = (struct osd_it_quota *)di;
198         ENTRY;
199
200         osd_zap_cursor_fini(it->oiq_zc);
201         osd_object_put(env, it->oiq_obj);
202         OBD_FREE_PTR(it);
203
204         EXIT;
205 }
206
207 /**
208  * Move on to the next valid entry.
209  *
210  * \param  di   - osd iterator
211  *
212  * \retval +ve  - iterator reached the end
213  * \retval   0  - iterator has not reached the end yet
214  * \retval -ve  - unexpected failure
215  */
216 static int osd_it_acct_next(const struct lu_env *env, struct dt_it *di)
217 {
218         struct osd_it_quota     *it = (struct osd_it_quota *)di;
219         zap_attribute_t         *za = &osd_oti_get(env)->oti_za;
220         int                      rc;
221         ENTRY;
222
223         if (it->oiq_reset == 0)
224                 zap_cursor_advance(it->oiq_zc);
225         it->oiq_reset = 0;
226         rc = -zap_cursor_retrieve(it->oiq_zc, za);
227         if (rc == -ENOENT) /* reached the end */
228                 rc = 1;
229         RETURN(rc);
230 }
231
232 /**
233  * Return pointer to the key under iterator.
234  *
235  * \param  di   - osd iterator
236  */
237 static struct dt_key *osd_it_acct_key(const struct lu_env *env,
238                                       const struct dt_it *di)
239 {
240         struct osd_it_quota     *it = (struct osd_it_quota *)di;
241         zap_attribute_t         *za = &osd_oti_get(env)->oti_za;
242         int                      rc;
243         ENTRY;
244
245         it->oiq_reset = 0;
246         rc = -zap_cursor_retrieve(it->oiq_zc, za);
247         if (rc)
248                 RETURN(ERR_PTR(rc));
249         rc = kstrtoull(za->za_name, 16, &it->oiq_id);
250
251         RETURN((struct dt_key *) &it->oiq_id);
252 }
253
254 /**
255  * Return size of key under iterator (in bytes)
256  *
257  * \param  di   - osd iterator
258  */
259 static int osd_it_acct_key_size(const struct lu_env *env,
260                                 const struct dt_it *di)
261 {
262         ENTRY;
263         RETURN((int)sizeof(uint64_t));
264 }
265
266 /*
267  * zap_cursor_retrieve read from current record.
268  * to read bytes we need to call zap_lookup explicitly.
269  */
270 static int osd_zap_cursor_retrieve_value(const struct lu_env *env,
271                                          zap_cursor_t *zc,  char *buf,
272                                          int buf_size, int *bytes_read)
273 {
274         zap_attribute_t *za = &osd_oti_get(env)->oti_za;
275         int rc, actual_size;
276
277         rc = -zap_cursor_retrieve(zc, za);
278         if (unlikely(rc != 0))
279                 return -rc;
280
281         if (unlikely(za->za_integer_length <= 0))
282                 return -ERANGE;
283
284         actual_size = za->za_integer_length * za->za_num_integers;
285
286         if (actual_size > buf_size) {
287                 actual_size = buf_size;
288                 buf_size = actual_size / za->za_integer_length;
289         } else {
290                 buf_size = za->za_num_integers;
291         }
292
293         rc = -zap_lookup(zc->zc_objset, zc->zc_zapobj,
294                          za->za_name, za->za_integer_length,
295                          buf_size, buf);
296
297         if (likely(rc == 0))
298                 *bytes_read = actual_size;
299
300         return rc;
301 }
302
303 /**
304  * Return pointer to the record under iterator.
305  *
306  * \param  di    - osd iterator
307  * \param  attr  - not used
308  */
309 static int osd_it_acct_rec(const struct lu_env *env,
310                            const struct dt_it *di,
311                            struct dt_rec *dtrec, __u32 attr)
312 {
313         struct osd_thread_info  *info = osd_oti_get(env);
314         zap_attribute_t         *za = &info->oti_za;
315         struct osd_it_quota     *it = (struct osd_it_quota *)di;
316         struct lquota_acct_rec  *rec  = (struct lquota_acct_rec *)dtrec;
317         struct osd_object       *obj = it->oiq_obj;
318         struct osd_device       *osd = osd_obj2dev(obj);
319         int                      bytes_read;
320         int                      rc;
321         ENTRY;
322
323         it->oiq_reset = 0;
324         rec->ispace = rec->bspace = 0;
325
326         /* retrieve block usage from the DMU accounting object */
327         rc = osd_zap_cursor_retrieve_value(env, it->oiq_zc,
328                                            (char *)&rec->bspace,
329                                            sizeof(uint64_t), &bytes_read);
330         if (rc)
331                 RETURN(rc);
332
333         if (!osd_dmu_userobj_accounting_available(osd)) {
334                 if (rec->bspace != 0)
335                         /* estimate #inodes in use */
336                         rec->ispace = osd_objset_user_iused(osd, rec->bspace);
337                 RETURN(0);
338         }
339
340         /* retrieve key associated with the current cursor */
341         rc = -zap_cursor_retrieve(it->oiq_zc, za);
342         if (unlikely(rc != 0))
343                 RETURN(rc);
344
345         /* inode accounting is not maintained by DMU, so we use our own ZAP to
346          * track inode usage */
347         rc = -zap_lookup(osd->od_os, it->oiq_obj->oo_dn->dn_object,
348                          za->za_name, sizeof(uint64_t), 1, &rec->ispace);
349         if (rc == -ENOENT)
350                 /* user/group has not created any file yet */
351                 CDEBUG(D_QUOTA, "%s: id %s not found in accounting ZAP\n",
352                        osd->od_svname, za->za_name);
353         else if (rc)
354                 RETURN(rc);
355
356         RETURN(0);
357 }
358
359 /**
360  * Returns cookie for current Iterator position.
361  *
362  * \param  di    - osd iterator
363  */
364 static __u64 osd_it_acct_store(const struct lu_env *env,
365                                const struct dt_it *di)
366 {
367         struct osd_it_quota *it = (struct osd_it_quota *)di;
368         ENTRY;
369         it->oiq_reset = 0;
370         RETURN(osd_zap_cursor_serialize(it->oiq_zc));
371 }
372
373 /**
374  * Restore iterator from cookie. if the \a hash isn't found,
375  * restore the first valid record.
376  *
377  * \param  di    - osd iterator
378  * \param  hash  - iterator location cookie
379  *
380  * \retval +ve  - di points to exact matched key
381  * \retval  0   - di points to the first valid record
382  * \retval -ve  - failure
383  */
384 static int osd_it_acct_load(const struct lu_env *env,
385                             const struct dt_it *di, __u64 hash)
386 {
387         struct osd_it_quota     *it  = (struct osd_it_quota *)di;
388         struct osd_device       *osd = osd_obj2dev(it->oiq_obj);
389         zap_attribute_t         *za = &osd_oti_get(env)->oti_za;
390         zap_cursor_t            *zc;
391         int                      rc;
392         ENTRY;
393
394         /* create new cursor pointing to the new hash */
395         rc = osd_zap_cursor_init(&zc, osd->od_os, it->oiq_oid, hash);
396         if (rc)
397                 RETURN(rc);
398         osd_zap_cursor_fini(it->oiq_zc);
399         it->oiq_zc = zc;
400         it->oiq_reset = 0;
401
402         rc = -zap_cursor_retrieve(it->oiq_zc, za);
403         if (rc == 0)
404                 rc = 1;
405         else if (rc == -ENOENT)
406                 rc = 0;
407
408         RETURN(rc);
409 }
410
411 /**
412  * Move Iterator to record specified by \a key, if the \a key isn't found,
413  * move to the first valid record.
414  *
415  * \param  di   - osd iterator
416  * \param  key  - uid or gid
417  *
418  * \retval +ve  - di points to exact matched key
419  * \retval 0    - di points to the first valid record
420  * \retval -ve  - failure
421  */
422 static int osd_it_acct_get(const struct lu_env *env, struct dt_it *di,
423                 const struct dt_key *key)
424 {
425         ENTRY;
426
427         /* XXX: like osd_zap_it_get(), API is currently broken */
428         LASSERT(*((__u64 *)key) == 0);
429
430         RETURN(osd_it_acct_load(env, di, 0));
431 }
432
433 /**
434  * Release Iterator
435  *
436  * \param  di   - osd iterator
437  */
438 static void osd_it_acct_put(const struct lu_env *env, struct dt_it *di)
439 {
440 }
441
442 /**
443  * Index and Iterator operations for accounting objects
444  */
445 const struct dt_index_operations osd_acct_index_ops = {
446         .dio_lookup = osd_acct_index_lookup,
447         .dio_it     = {
448                 .init           = osd_it_acct_init,
449                 .fini           = osd_it_acct_fini,
450                 .get            = osd_it_acct_get,
451                 .put            = osd_it_acct_put,
452                 .next           = osd_it_acct_next,
453                 .key            = osd_it_acct_key,
454                 .key_size       = osd_it_acct_key_size,
455                 .rec            = osd_it_acct_rec,
456                 .store          = osd_it_acct_store,
457                 .load           = osd_it_acct_load
458         }
459 };
460
461 /**
462  * Quota Enforcement Management
463  */
464
465 /*
466  * Wrapper for qsd_op_begin().
467  *
468  * \param env    - the environment passed by the caller
469  * \param osd    - is the osd_device
470  * \param uid    - user id of the inode
471  * \param gid    - group id of the inode
472  * \param space  - how many blocks/inodes will be consumed/released
473  * \param oh     - osd transaction handle
474  * \param is_blk - block quota or inode quota?
475  * \param flags  - if the operation is write, return no user quota, no
476  *                  group quota, or sync commit flags to the caller
477  * \param force  - set to 1 when changes are performed by root user and thus
478  *                  can't failed with EDQUOT
479  *
480  * \retval 0      - success
481  * \retval -ve    - failure
482  */
483 int osd_declare_quota(const struct lu_env *env, struct osd_device *osd,
484                       qid_t uid, qid_t gid, long long space,
485                       struct osd_thandle *oh, bool is_blk, int *flags,
486                       bool force)
487 {
488         struct osd_thread_info  *info = osd_oti_get(env);
489         struct lquota_id_info   *qi = &info->oti_qi;
490         struct qsd_instance     *qsd = osd->od_quota_slave;
491         int                      rcu, rcg; /* user & group rc */
492         ENTRY;
493
494         if (unlikely(qsd == NULL))
495                 /* quota slave instance hasn't been allocated yet */
496                 RETURN(0);
497
498         /* let's start with user quota */
499         qi->lqi_id.qid_uid = uid;
500         qi->lqi_type       = USRQUOTA;
501         qi->lqi_space      = space;
502         qi->lqi_is_blk     = is_blk;
503         rcu = qsd_op_begin(env, qsd, &oh->ot_quota_trans, qi, flags);
504
505         if (force && (rcu == -EDQUOT || rcu == -EINPROGRESS))
506                 /* ignore EDQUOT & EINPROGRESS when changes are done by root */
507                 rcu = 0;
508
509         /* For non-fatal error, we want to continue to get the noquota flags
510          * for group id. This is only for commit write, which has @flags passed
511          * in. See osd_declare_write_commit().
512          * When force is set to true, we also want to proceed with the gid */
513         if (rcu && (rcu != -EDQUOT || flags == NULL))
514                 RETURN(rcu);
515
516         /* and now group quota */
517         qi->lqi_id.qid_gid = gid;
518         qi->lqi_type       = GRPQUOTA;
519         rcg = qsd_op_begin(env, qsd, &oh->ot_quota_trans, qi, flags);
520
521         if (force && (rcg == -EDQUOT || rcg == -EINPROGRESS))
522                 /* as before, ignore EDQUOT & EINPROGRESS for root */
523                 rcg = 0;
524
525         RETURN(rcu ? rcu : rcg);
526 }