Whamcloud - gitweb
LU-4821 osd: cleanups in osd-zfs
[fs/lustre-release.git] / lustre / osd-zfs / osd_quota.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 021110-1307, USA
20  *
21  * GPL HEADER END
22  */
23 /*
24  * Copyright (c) 2012, Intel Corporation.
25  * Use is subject to license terms.
26  *
27  * Author: Johann Lombardi <johann@whamcloud.com>
28  */
29
30 #include <lustre_quota.h>
31 #include <obd.h>
32 #include "osd_internal.h"
33
34 /**
35  * Helper function to retrieve DMU object id from fid for accounting object
36  */
37 uint64_t osd_quota_fid2dmu(const struct lu_fid *fid)
38 {
39         LASSERT(fid_is_acct(fid));
40         if (fid_oid(fid) == ACCT_GROUP_OID)
41                 return DMU_GROUPUSED_OBJECT;
42         return DMU_USERUSED_OBJECT;
43 }
44
45 /**
46  * Helper function to estimate the number of inodes in use for a give uid/gid
47  * from the block usage
48  */
49 static uint64_t osd_objset_user_iused(struct osd_device *osd, uint64_t uidbytes)
50 {
51         uint64_t refdbytes, availbytes, usedobjs, availobjs;
52         uint64_t uidobjs;
53
54         /* get fresh statfs info */
55         dmu_objset_space(osd->od_os, &refdbytes, &availbytes,
56                          &usedobjs, &availobjs);
57
58         /* estimate the number of objects based on the disk usage */
59         uidobjs = osd_objs_count_estimate(refdbytes, usedobjs,
60                                           uidbytes >> SPA_MAXBLOCKSHIFT);
61         if (uidbytes > 0)
62                 /* if we have at least 1 byte, we have at least one dnode ... */
63                 uidobjs = max_t(uint64_t, uidobjs, 1);
64
65         return uidobjs;
66 }
67
68 /**
69  * Space Accounting Management
70  */
71
72 /**
73  * Return space usage consumed by a given uid or gid.
74  * Block usage is accurrate since it is maintained by DMU itself.
75  * However, DMU does not provide inode accounting, so the #inodes in use
76  * is estimated from the block usage and statfs information.
77  *
78  * \param env   - is the environment passed by the caller
79  * \param dtobj - is the accounting object
80  * \param dtrec - is the record to fill with space usage information
81  * \param dtkey - is the id the of the user or group for which we would
82  *                like to access disk usage.
83  * \param capa - is the capability, not used.
84  *
85  * \retval +ve - success : exact match
86  * \retval -ve - failure
87  */
88 static int osd_acct_index_lookup(const struct lu_env *env,
89                                 struct dt_object *dtobj,
90                                 struct dt_rec *dtrec,
91                                 const struct dt_key *dtkey,
92                                 struct lustre_capa *capa)
93 {
94         struct osd_thread_info  *info = osd_oti_get(env);
95         char                    *buf  = info->oti_buf;
96         struct lquota_acct_rec  *rec  = (struct lquota_acct_rec *)dtrec;
97         struct osd_object       *obj = osd_dt_obj(dtobj);
98         struct osd_device       *osd = osd_obj2dev(obj);
99         int                      rc;
100         uint64_t                 oid;
101         ENTRY;
102
103         rec->bspace = rec->ispace = 0;
104
105         /* convert the 64-bit uid/gid into a string */
106         sprintf(buf, "%llx", *((__u64 *)dtkey));
107         /* fetch DMU object ID (DMU_USERUSED_OBJECT/DMU_GROUPUSED_OBJECT) to be
108          * used */
109         oid = osd_quota_fid2dmu(lu_object_fid(&dtobj->do_lu));
110
111         /* disk usage (in bytes) is maintained by DMU.
112          * DMU_USERUSED_OBJECT/DMU_GROUPUSED_OBJECT are special objects which
113          * not associated with any dmu_but_t (see dnode_special_open()).
114          * As a consequence, we cannot use udmu_zap_lookup() here since it
115          * requires a valid oo_db. */
116         rc = -zap_lookup(osd->od_os, oid, buf, sizeof(uint64_t), 1,
117                         &rec->bspace);
118         if (rc == -ENOENT)
119                 /* user/group has not created anything yet */
120                 CDEBUG(D_QUOTA, "%s: id %s not found in DMU accounting ZAP\n",
121                        osd->od_svname, buf);
122         else if (rc)
123                 RETURN(rc);
124
125         if (osd->od_quota_iused_est) {
126                 if (rec->bspace != 0)
127                         /* estimate #inodes in use */
128                         rec->ispace = osd_objset_user_iused(osd, rec->bspace);
129                 RETURN(+1);
130         }
131
132         /* as for inode accounting, it is not maintained by DMU, so we just
133          * use our own ZAP to track inode usage */
134         rc = -zap_lookup(osd->od_os, obj->oo_db->db_object,
135                          buf, sizeof(uint64_t), 1, &rec->ispace);
136         if (rc == -ENOENT)
137                 /* user/group has not created any file yet */
138                 CDEBUG(D_QUOTA, "%s: id %s not found in accounting ZAP\n",
139                        osd->od_svname, buf);
140         else if (rc)
141                 RETURN(rc);
142
143         RETURN(+1);
144 }
145
146 /**
147  * Initialize osd Iterator for given osd index object.
148  *
149  * \param  dt    - osd index object
150  * \param  attr  - not used
151  * \param  capa  - BYPASS_CAPA
152  */
153 static struct dt_it *osd_it_acct_init(const struct lu_env *env,
154                                       struct dt_object *dt,
155                                       __u32 attr,
156                                       struct lustre_capa *capa)
157 {
158         struct osd_thread_info  *info = osd_oti_get(env);
159         struct osd_it_quota     *it;
160         struct lu_object        *lo   = &dt->do_lu;
161         struct osd_device       *osd  = osd_dev(lo->lo_dev);
162         int                      rc;
163         ENTRY;
164
165         LASSERT(lu_object_exists(lo));
166
167         if (info == NULL)
168                 RETURN(ERR_PTR(-ENOMEM));
169
170         it = &info->oti_it_quota;
171         memset(it, 0, sizeof(*it));
172         it->oiq_oid = osd_quota_fid2dmu(lu_object_fid(lo));
173
174         /* initialize zap cursor */
175         rc = osd_zap_cursor_init(&it->oiq_zc, osd->od_os, it->oiq_oid, 0);
176         if (rc)
177                 RETURN(ERR_PTR(rc));
178
179         /* take object reference */
180         lu_object_get(lo);
181         it->oiq_obj   = osd_dt_obj(dt);
182         it->oiq_reset = 1;
183
184         RETURN((struct dt_it *)it);
185 }
186
187 /**
188  * Free given iterator.
189  *
190  * \param  di   - osd iterator
191  */
192 static void osd_it_acct_fini(const struct lu_env *env, struct dt_it *di)
193 {
194         struct osd_it_quota *it = (struct osd_it_quota *)di;
195         ENTRY;
196         osd_zap_cursor_fini(it->oiq_zc);
197         lu_object_put(env, &it->oiq_obj->oo_dt.do_lu);
198         EXIT;
199 }
200
201 /**
202  * Move on to the next valid entry.
203  *
204  * \param  di   - osd iterator
205  *
206  * \retval +ve  - iterator reached the end
207  * \retval   0  - iterator has not reached the end yet
208  * \retval -ve  - unexpected failure
209  */
210 static int osd_it_acct_next(const struct lu_env *env, struct dt_it *di)
211 {
212         struct osd_it_quota     *it = (struct osd_it_quota *)di;
213         zap_attribute_t         *za = &osd_oti_get(env)->oti_za;
214         int                      rc;
215         ENTRY;
216
217         if (it->oiq_reset == 0)
218                 zap_cursor_advance(it->oiq_zc);
219         it->oiq_reset = 0;
220         rc = -zap_cursor_retrieve(it->oiq_zc, za);
221         if (rc == -ENOENT) /* reached the end */
222                 rc = 1;
223         RETURN(rc);
224 }
225
226 /**
227  * Return pointer to the key under iterator.
228  *
229  * \param  di   - osd iterator
230  */
231 static struct dt_key *osd_it_acct_key(const struct lu_env *env,
232                                       const struct dt_it *di)
233 {
234         struct osd_it_quota     *it = (struct osd_it_quota *)di;
235         zap_attribute_t         *za = &osd_oti_get(env)->oti_za;
236         int                      rc;
237         ENTRY;
238
239         it->oiq_reset = 0;
240         rc = -zap_cursor_retrieve(it->oiq_zc, za);
241         if (rc)
242                 RETURN(ERR_PTR(rc));
243         rc = kstrtoull(za->za_name, 16, &it->oiq_id);
244
245         RETURN((struct dt_key *) &it->oiq_id);
246 }
247
248 /**
249  * Return size of key under iterator (in bytes)
250  *
251  * \param  di   - osd iterator
252  */
253 static int osd_it_acct_key_size(const struct lu_env *env,
254                                 const struct dt_it *di)
255 {
256         ENTRY;
257         RETURN((int)sizeof(uint64_t));
258 }
259
260 /*
261  * zap_cursor_retrieve read from current record.
262  * to read bytes we need to call zap_lookup explicitly.
263  */
264 static int osd_zap_cursor_retrieve_value(const struct lu_env *env,
265                                          zap_cursor_t *zc,  char *buf,
266                                          int buf_size, int *bytes_read)
267 {
268         zap_attribute_t *za = &osd_oti_get(env)->oti_za;
269         int rc, actual_size;
270
271         rc = -zap_cursor_retrieve(zc, za);
272         if (unlikely(rc != 0))
273                 return -rc;
274
275         if (unlikely(za->za_integer_length <= 0))
276                 return -ERANGE;
277
278         actual_size = za->za_integer_length * za->za_num_integers;
279
280         if (actual_size > buf_size) {
281                 actual_size = buf_size;
282                 buf_size = actual_size / za->za_integer_length;
283         } else {
284                 buf_size = za->za_num_integers;
285         }
286
287         rc = -zap_lookup(zc->zc_objset, zc->zc_zapobj,
288                          za->za_name, za->za_integer_length,
289                          buf_size, buf);
290
291         if (likely(rc == 0))
292                 *bytes_read = actual_size;
293
294         return rc;
295 }
296
297 /**
298  * Return pointer to the record under iterator.
299  *
300  * \param  di    - osd iterator
301  * \param  attr  - not used
302  */
303 static int osd_it_acct_rec(const struct lu_env *env,
304                            const struct dt_it *di,
305                            struct dt_rec *dtrec, __u32 attr)
306 {
307         struct osd_thread_info  *info = osd_oti_get(env);
308         zap_attribute_t         *za = &info->oti_za;
309         struct osd_it_quota     *it = (struct osd_it_quota *)di;
310         struct lquota_acct_rec  *rec  = (struct lquota_acct_rec *)dtrec;
311         struct osd_object       *obj = it->oiq_obj;
312         struct osd_device       *osd = osd_obj2dev(obj);
313         int                      bytes_read;
314         int                      rc;
315         ENTRY;
316
317         it->oiq_reset = 0;
318         rec->ispace = rec->bspace = 0;
319
320         /* retrieve block usage from the DMU accounting object */
321         rc = osd_zap_cursor_retrieve_value(env, it->oiq_zc,
322                                            (char *)&rec->bspace,
323                                            sizeof(uint64_t), &bytes_read);
324         if (rc)
325                 RETURN(rc);
326
327         if (osd->od_quota_iused_est) {
328                 if (rec->bspace != 0)
329                         /* estimate #inodes in use */
330                         rec->ispace = osd_objset_user_iused(osd, rec->bspace);
331                 RETURN(0);
332         }
333
334         /* retrieve key associated with the current cursor */
335         rc = -zap_cursor_retrieve(it->oiq_zc, za);
336         if (unlikely(rc != 0))
337                 RETURN(rc);
338
339         /* inode accounting is not maintained by DMU, so we use our own ZAP to
340          * track inode usage */
341         rc = -zap_lookup(osd->od_os, it->oiq_obj->oo_db->db_object,
342                          za->za_name, sizeof(uint64_t), 1, &rec->ispace);
343         if (rc == -ENOENT)
344                 /* user/group has not created any file yet */
345                 CDEBUG(D_QUOTA, "%s: id %s not found in accounting ZAP\n",
346                        osd->od_svname, za->za_name);
347         else if (rc)
348                 RETURN(rc);
349
350         RETURN(0);
351 }
352
353 /**
354  * Returns cookie for current Iterator position.
355  *
356  * \param  di    - osd iterator
357  */
358 static __u64 osd_it_acct_store(const struct lu_env *env,
359                                const struct dt_it *di)
360 {
361         struct osd_it_quota *it = (struct osd_it_quota *)di;
362         ENTRY;
363         it->oiq_reset = 0;
364         RETURN(osd_zap_cursor_serialize(it->oiq_zc));
365 }
366
367 /**
368  * Restore iterator from cookie. if the \a hash isn't found,
369  * restore the first valid record.
370  *
371  * \param  di    - osd iterator
372  * \param  hash  - iterator location cookie
373  *
374  * \retval +ve  - di points to exact matched key
375  * \retval  0   - di points to the first valid record
376  * \retval -ve  - failure
377  */
378 static int osd_it_acct_load(const struct lu_env *env,
379                             const struct dt_it *di, __u64 hash)
380 {
381         struct osd_it_quota     *it  = (struct osd_it_quota *)di;
382         struct osd_device       *osd = osd_obj2dev(it->oiq_obj);
383         zap_attribute_t         *za = &osd_oti_get(env)->oti_za;
384         zap_cursor_t            *zc;
385         int                      rc;
386         ENTRY;
387
388         /* create new cursor pointing to the new hash */
389         rc = osd_zap_cursor_init(&zc, osd->od_os, it->oiq_oid, hash);
390         if (rc)
391                 RETURN(rc);
392         osd_zap_cursor_fini(it->oiq_zc);
393         it->oiq_zc = zc;
394         it->oiq_reset = 0;
395
396         rc = -zap_cursor_retrieve(it->oiq_zc, za);
397         if (rc == 0)
398                 rc = 1;
399         else if (rc == -ENOENT)
400                 rc = 0;
401
402         RETURN(rc);
403 }
404
405 /**
406  * Move Iterator to record specified by \a key, if the \a key isn't found,
407  * move to the first valid record.
408  *
409  * \param  di   - osd iterator
410  * \param  key  - uid or gid
411  *
412  * \retval +ve  - di points to exact matched key
413  * \retval 0    - di points to the first valid record
414  * \retval -ve  - failure
415  */
416 static int osd_it_acct_get(const struct lu_env *env, struct dt_it *di,
417                 const struct dt_key *key)
418 {
419         ENTRY;
420
421         /* XXX: like osd_zap_it_get(), API is currently broken */
422         LASSERT(*((__u64 *)key) == 0);
423
424         RETURN(osd_it_acct_load(env, di, 0));
425 }
426
427 /**
428  * Release Iterator
429  *
430  * \param  di   - osd iterator
431  */
432 static void osd_it_acct_put(const struct lu_env *env, struct dt_it *di)
433 {
434 }
435
436 /**
437  * Index and Iterator operations for accounting objects
438  */
439 const struct dt_index_operations osd_acct_index_ops = {
440         .dio_lookup = osd_acct_index_lookup,
441         .dio_it     = {
442                 .init           = osd_it_acct_init,
443                 .fini           = osd_it_acct_fini,
444                 .get            = osd_it_acct_get,
445                 .put            = osd_it_acct_put,
446                 .next           = osd_it_acct_next,
447                 .key            = osd_it_acct_key,
448                 .key_size       = osd_it_acct_key_size,
449                 .rec            = osd_it_acct_rec,
450                 .store          = osd_it_acct_store,
451                 .load           = osd_it_acct_load
452         }
453 };
454
455 /**
456  * Quota Enforcement Management
457  */
458
459 /*
460  * Wrapper for qsd_op_begin().
461  *
462  * \param env    - the environment passed by the caller
463  * \param osd    - is the osd_device
464  * \param uid    - user id of the inode
465  * \param gid    - group id of the inode
466  * \param space  - how many blocks/inodes will be consumed/released
467  * \param oh     - osd transaction handle
468  * \param is_blk - block quota or inode quota?
469  * \param flags  - if the operation is write, return no user quota, no
470  *                  group quota, or sync commit flags to the caller
471  * \param force  - set to 1 when changes are performed by root user and thus
472  *                  can't failed with EDQUOT
473  *
474  * \retval 0      - success
475  * \retval -ve    - failure
476  */
477 int osd_declare_quota(const struct lu_env *env, struct osd_device *osd,
478                       qid_t uid, qid_t gid, long long space,
479                       struct osd_thandle *oh, bool is_blk, int *flags,
480                       bool force)
481 {
482         struct osd_thread_info  *info = osd_oti_get(env);
483         struct lquota_id_info   *qi = &info->oti_qi;
484         struct qsd_instance     *qsd = osd->od_quota_slave;
485         int                      rcu, rcg; /* user & group rc */
486         ENTRY;
487
488         if (unlikely(qsd == NULL))
489                 /* quota slave instance hasn't been allocated yet */
490                 RETURN(0);
491
492         /* let's start with user quota */
493         qi->lqi_id.qid_uid = uid;
494         qi->lqi_type       = USRQUOTA;
495         qi->lqi_space      = space;
496         qi->lqi_is_blk     = is_blk;
497         rcu = qsd_op_begin(env, qsd, &oh->ot_quota_trans, qi, flags);
498
499         if (force && (rcu == -EDQUOT || rcu == -EINPROGRESS))
500                 /* ignore EDQUOT & EINPROGRESS when changes are done by root */
501                 rcu = 0;
502
503         /* For non-fatal error, we want to continue to get the noquota flags
504          * for group id. This is only for commit write, which has @flags passed
505          * in. See osd_declare_write_commit().
506          * When force is set to true, we also want to proceed with the gid */
507         if (rcu && (rcu != -EDQUOT || flags == NULL))
508                 RETURN(rcu);
509
510         /* and now group quota */
511         qi->lqi_id.qid_gid = gid;
512         qi->lqi_type       = GRPQUOTA;
513         rcg = qsd_op_begin(env, qsd, &oh->ot_quota_trans, qi, flags);
514
515         if (force && (rcg == -EDQUOT || rcg == -EINPROGRESS))
516                 /* as before, ignore EDQUOT & EINPROGRESS for root */
517                 rcg = 0;
518
519         RETURN(rcu ? rcu : rcg);
520 }