Whamcloud - gitweb
LU-4944 osd: support for zfs-0.6.3
[fs/lustre-release.git] / lustre / osd-zfs / osd_quota.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 021110-1307, USA
20  *
21  * GPL HEADER END
22  */
23 /*
24  * Copyright (c) 2012, Intel Corporation.
25  * Use is subject to license terms.
26  *
27  * Author: Johann Lombardi <johann@whamcloud.com>
28  */
29
30 #include <lustre_quota.h>
31 #include <obd.h>
32 #include "osd_internal.h"
33
34 #include <sys/dnode.h>
35 #include <sys/spa.h>
36 #include <sys/zap.h>
37 #include <sys/dmu_tx.h>
38 #include <sys/dsl_prop.h>
39 #include <sys/txg.h>
40
41 /*
42  * the structure tracks per-ID change/state
43  */
44 struct zfs_id_change {
45         struct hlist_node       zic_hash;
46         __u64                   zic_id;
47         atomic_t                zic_num;
48 };
49
50 /*
51  * callback data for cfs_hash_for_each_safe()
52  * used in txg commit and OSD cleanup path
53  */
54 struct hash_cbdata {
55         struct osd_device       *hcb_osd;
56         uint64_t                 hcb_zapid;
57         dmu_tx_t                *hcb_tx;
58 };
59
60 /**
61  * Helper function to retrieve DMU object id from fid for accounting object
62  */
63 static inline uint64_t osd_quota_fid2dmu(const struct lu_fid *fid)
64 {
65         LASSERT(fid_is_acct(fid));
66         if (fid_oid(fid) == ACCT_GROUP_OID)
67                 return DMU_GROUPUSED_OBJECT;
68         return DMU_USERUSED_OBJECT;
69 }
70
71 /*
72  * a note about locking:
73  *  entries in per-OSD cache never go before umount,
74  *  so there is no need in locking for lookups.
75  *
76  *  entries in per-txg deltas never go before txg is closed,
77  *  there is no concurrency between removal/insertions.
78  *
79  * also, given all above, there is no need in reference counting.
80  */
81 static struct zfs_id_change *osd_zfs_lookup_by_id(cfs_hash_t *hash, __u64 id)
82 {
83         struct zfs_id_change    *za = NULL;
84         struct hlist_node       *hnode;
85         cfs_hash_bd_t            bd;
86
87         cfs_hash_bd_get(hash, &id, &bd);
88         hnode = cfs_hash_bd_peek_locked(hash, &bd, &id);
89         if (hnode != NULL)
90                 za = container_of0(hnode, struct zfs_id_change, zic_hash);
91
92         return za;
93 }
94
95 static struct zfs_id_change *lookup_or_create_by_id(struct osd_device *osd,
96                                                 cfs_hash_t *hash, __u64 id)
97 {
98         struct zfs_id_change    *za, *tmp;
99         struct hlist_node       *hnode;
100         cfs_hash_bd_t            bd;
101
102         za = osd_zfs_lookup_by_id(hash, id);
103         if (likely(za != NULL))
104                 return za;
105
106         OBD_ALLOC_PTR(za);
107         if (unlikely(za == NULL))
108                 return NULL;
109
110         za->zic_id = id;
111
112         cfs_hash_bd_get(hash, &id, &bd);
113         spin_lock(&osd->od_known_txg_lock);
114         hnode = cfs_hash_bd_findadd_locked(hash, &bd, &id, &za->zic_hash, 1);
115         LASSERT(hnode != NULL);
116         tmp = container_of0(hnode, struct zfs_id_change, zic_hash);
117         spin_unlock(&osd->od_known_txg_lock);
118
119         if (tmp == za) {
120                 /*
121                  * our structure got into the hash
122                  */
123         } else {
124                 /* somebody won the race, we wasted the cycles */
125                 OBD_FREE_PTR(za);
126         }
127
128         return tmp;
129 }
130
131 /*
132  * used to maintain per-txg deltas
133  */
134 static int osd_zfs_acct_id(const struct lu_env *env, cfs_hash_t *hash,
135                            __u64 id, int delta, struct osd_thandle *oh)
136 {
137         struct osd_device       *osd = osd_dt_dev(oh->ot_super.th_dev);
138         struct zfs_id_change    *za;
139
140         LASSERT(hash);
141         LASSERT(oh->ot_tx);
142         LASSERT(oh->ot_tx->tx_txg == osd->od_known_txg);
143         LASSERT(osd->od_acct_delta != NULL);
144
145         za = lookup_or_create_by_id(osd, hash, id);
146         if (unlikely(za == NULL))
147                 return -ENOMEM;
148
149         atomic_add(delta, &za->zic_num);
150
151         return 0;
152 }
153
154 /*
155  * this function is used to maintain current state for given ID:
156  * at the beginning it initializes the cache from correspoding ZAP
157  */
158 static void osd_zfs_acct_cache_init(const struct lu_env *env,
159                                     struct osd_device *osd,
160                                     cfs_hash_t *hash, __u64 oid,
161                                     __u64 id, int delta,
162                                     struct osd_thandle *oh)
163 {
164         char                    *buf  = osd_oti_get(env)->oti_buf;
165         struct hlist_node       *hnode;
166         cfs_hash_bd_t            bd;
167         struct zfs_id_change    *za, *tmp;
168         __u64                    v;
169         int                      rc;
170
171         za = osd_zfs_lookup_by_id(hash, id);
172         if (likely(za != NULL))
173                 goto apply;
174
175         /*
176          * any concurrent thread is running in the same txg, so no on-disk
177          * accounting ZAP can be modified until this txg is closed
178          * thus all the concurrent threads must be getting the same value
179          * from that ZAP and we don't need to serialize lookups
180          */
181         snprintf(buf, sizeof(osd_oti_get(env)->oti_buf), "%llx", id);
182         /* XXX: we should be using zap_lookup_int_key(), but it consumes
183          *      20 bytes on the stack for buf .. */
184         rc = -zap_lookup(osd->od_objset.os, oid, buf, sizeof(uint64_t), 1, &v);
185         if (rc == -ENOENT) {
186                 v = 0;
187         } else if (unlikely(rc != 0)) {
188                 CERROR("%s: can't access accounting zap %llu\n",
189                        osd->od_svname, oid);
190                 return;
191         }
192
193         OBD_ALLOC_PTR(za);
194         if (unlikely(za == NULL)) {
195                 CERROR("%s: can't allocate za\n", osd->od_svname);
196                 return;
197         }
198
199         za->zic_id = id;
200         atomic_set(&za->zic_num, v);
201
202         cfs_hash_bd_get(hash, &id, &bd);
203         spin_lock(&osd->od_known_txg_lock);
204         hnode = cfs_hash_bd_findadd_locked(hash, &bd, &id, &za->zic_hash, 1);
205         LASSERT(hnode != NULL);
206         tmp = container_of0(hnode, struct zfs_id_change, zic_hash);
207         spin_unlock(&osd->od_known_txg_lock);
208
209         if (tmp == za) {
210                 /* our structure got into the hash */
211                 if (rc == -ENOENT) {
212                         /* there was no entry in ZAP yet, we have
213                          * to initialize with 0, so that accounting
214                          * reports can find that and then find our
215                          * cached value. */
216                         v = 0;
217                         rc = -zap_update(osd->od_objset.os, oid, buf,
218                                          sizeof(uint64_t), 1, &v, oh->ot_tx);
219                         if (unlikely(rc != 0))
220                                 CERROR("%s: can't initialize: rc = %d\n",
221                                        osd->od_svname, rc);
222                 }
223         } else {
224                 /* somebody won the race, we wasted the cycles */
225                 OBD_FREE_PTR(za);
226                 za = tmp;
227         }
228
229 apply:
230         LASSERT(za != NULL);
231         atomic_add(delta, &za->zic_num);
232 }
233
234 static __u32 acct_hashfn(cfs_hash_t *hash_body, const void *key, unsigned mask)
235 {
236         const __u64     *id = key;
237         __u32            result;
238
239         result = (__u32) *id;
240         return result % mask;
241 }
242
243 static void *acct_key(struct hlist_node *hnode)
244 {
245         struct zfs_id_change    *ac;
246
247         ac = hlist_entry(hnode, struct zfs_id_change, zic_hash);
248         return &ac->zic_id;
249 }
250
251 static int acct_hashkey_keycmp(const void *key,
252                                struct hlist_node *compared_hnode)
253 {
254         struct zfs_id_change    *ac;
255         const __u64             *id = key;
256
257         ac = hlist_entry(compared_hnode, struct zfs_id_change, zic_hash);
258         return *id == ac->zic_id;
259 }
260
261 static void *acct_hashobject(struct hlist_node *hnode)
262 {
263         return hlist_entry(hnode, struct zfs_id_change, zic_hash);
264 }
265
266 static cfs_hash_ops_t acct_hash_operations = {
267         .hs_hash        = acct_hashfn,
268         .hs_key         = acct_key,
269         .hs_keycmp      = acct_hashkey_keycmp,
270         .hs_object      = acct_hashobject,
271 };
272
273 #define ACCT_HASH_OPS (CFS_HASH_NO_LOCK|CFS_HASH_NO_ITEMREF|CFS_HASH_ADD_TAIL)
274
275 int osd_zfs_acct_init(const struct lu_env *env, struct osd_device *o)
276 {
277         int rc = 0;
278         ENTRY;
279
280         spin_lock_init(&o->od_known_txg_lock);
281
282         /* global structure representing current state for given ID */
283         o->od_acct_usr = cfs_hash_create("usr", 4, 4, 4, 0, 0, 0,
284                                          &acct_hash_operations,
285                                          ACCT_HASH_OPS);
286         if (o->od_acct_usr == NULL)
287                 GOTO(out, rc = -ENOMEM);
288
289         o->od_acct_grp = cfs_hash_create("grp", 4, 4, 4, 0, 0, 0,
290                                          &acct_hash_operations,
291                                          ACCT_HASH_OPS);
292         if (o->od_acct_grp == NULL)
293                 GOTO(out, rc = -ENOMEM);
294
295 out:
296         RETURN(rc);
297 }
298
299 static int osd_zfs_delete_item(cfs_hash_t *hs, cfs_hash_bd_t *bd,
300                                struct hlist_node *node, void *data)
301 {
302         struct hash_cbdata      *d = data;
303         struct zfs_id_change    *za;
304         __u64                    v;
305         char                     buf[12];
306         int                      rc;
307
308         za = hlist_entry(node, struct zfs_id_change, zic_hash);
309
310         /*
311          * XXX: should we try to fix accounting we failed to update before?
312          */
313 #if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2, 5, 70, 0)
314         /*
315          * extra checks to ensure our cache matches on-disk state
316          */
317         snprintf(buf, sizeof(buf), "%llx", za->zic_id);
318         rc = -zap_lookup(d->hcb_osd->od_objset.os, d->hcb_zapid,
319                          buf, sizeof(uint64_t), 1, &v);
320         /* pairs with zero value are removed by ZAP automatically */
321         if (rc == -ENOENT)
322                 v = 0;
323         if (atomic_read(&za->zic_num) != v) {
324                 CERROR("%s: INVALID ACCOUNTING FOR %llu %d != %lld: rc = %d\n",
325                        d->hcb_osd->od_svname, za->zic_id,
326                        atomic_read(&za->zic_num), v, rc);
327                 /* XXX: to catch with automated testing */
328                 LBUG();
329         }
330 #else
331 #warning "remove this additional check before release"
332 #endif
333
334         cfs_hash_bd_del_locked(hs, bd, node);
335         OBD_FREE_PTR(za);
336
337         return 0;
338 }
339
340 void osd_zfs_acct_fini(const struct lu_env *env, struct osd_device *o)
341 {
342         struct hash_cbdata      cbdata;
343
344         cbdata.hcb_osd = o;
345
346         /* release object accounting cache (owners) */
347         cbdata.hcb_zapid = o->od_iusr_oid;
348
349         if (o->od_acct_usr) {
350                 cfs_hash_for_each_safe(o->od_acct_usr, osd_zfs_delete_item,
351                                        &cbdata);
352                 cfs_hash_putref(o->od_acct_usr);
353                 o->od_acct_usr = NULL;
354         }
355
356         /* release object accounting cache (groups) */
357         cbdata.hcb_zapid = o->od_igrp_oid;
358
359         if (o->od_acct_grp) {
360                 cfs_hash_for_each_safe(o->od_acct_grp, osd_zfs_delete_item,
361                                        &cbdata);
362                 cfs_hash_putref(o->od_acct_grp);
363                 o->od_acct_grp = NULL;
364         }
365 }
366
367 static int osd_zfs_commit_item(cfs_hash_t *hs, cfs_hash_bd_t *bd,
368                                struct hlist_node *node, void *data)
369 {
370         struct hash_cbdata      *d = data;
371         struct osd_device       *osd = d->hcb_osd;
372         struct zfs_id_change    *za;
373         int                      rc;
374
375         za = hlist_entry(node, struct zfs_id_change, zic_hash);
376
377         rc = -zap_increment_int(osd->od_objset.os, d->hcb_zapid, za->zic_id,
378                                 atomic_read(&za->zic_num), d->hcb_tx);
379         if (unlikely(rc != 0))
380                 CERROR("%s: quota update for UID "LPU64" failed: rc = %d\n",
381                        osd->od_svname, za->zic_id, rc);
382
383         cfs_hash_bd_del_locked(hs, bd, node);
384         OBD_FREE_PTR(za);
385
386         return 0;
387 }
388
389 /*
390  * this function is called as part of txg commit procedure,
391  * no more normal changes are allowed to this txg.
392  * we go over all the changes cached in per-txg structure
393  * and apply them to actual ZAPs
394  */
395 #ifdef HAVE_DSL_SYNC_TASK_DO_NOWAIT
396 static void osd_zfs_acct_update(void *arg, void *arg2, dmu_tx_t *tx)
397 #else
398 static void osd_zfs_acct_update(void *arg, dmu_tx_t *tx)
399 #endif
400 {
401         struct osd_zfs_acct_txg *zat = arg;
402         struct osd_device       *osd = zat->zat_osd;
403         struct hash_cbdata       cbdata;
404
405         cbdata.hcb_osd = osd;
406         cbdata.hcb_tx = tx;
407
408         CDEBUG(D_OTHER, "COMMIT %llu on %s\n", tx->tx_txg, osd->od_svname);
409
410         /* apply changes related to the owners */
411         cbdata.hcb_zapid = osd->od_iusr_oid;
412         cfs_hash_for_each_safe(zat->zat_usr, osd_zfs_commit_item, &cbdata);
413
414         /* apply changes related to the groups */
415         cbdata.hcb_zapid = osd->od_igrp_oid;
416         cfs_hash_for_each_safe(zat->zat_grp, osd_zfs_commit_item, &cbdata);
417
418         cfs_hash_putref(zat->zat_usr);
419         cfs_hash_putref(zat->zat_grp);
420
421         OBD_FREE_PTR(zat);
422 }
423
424 #ifdef HAVE_DSL_SYNC_TASK_DO_NOWAIT
425 #define dsl_sync_task_nowait(pool, func, arg, blocks, tx) \
426         dsl_sync_task_do_nowait(pool, NULL, func, arg, NULL, blocks, tx)
427 #endif
428
429 /*
430  * if any change to the object accounting is going to happen,
431  * we create one structure per txg to track all the changes
432  * and register special routine to be called as part of txg
433  * commit procedure.
434  */
435 int osd_zfs_acct_trans_start(const struct lu_env *env, struct osd_thandle *oh)
436 {
437         struct osd_device       *osd = osd_dt_dev(oh->ot_super.th_dev);
438         struct osd_zfs_acct_txg *ac = NULL;
439         int                      rc = 0, add_work = 0;
440
441         if (likely(oh->ot_tx->tx_txg == osd->od_known_txg)) {
442                 /* already created */
443                 return 0;
444         }
445
446         OBD_ALLOC_PTR(ac);
447         if (unlikely(ac == NULL))
448                 return -ENOMEM;
449
450         ac->zat_usr = cfs_hash_create("usr", 4, 4, 4, 0, 0, 0,
451                                       &acct_hash_operations,
452                                       ACCT_HASH_OPS);
453         if (unlikely(ac->zat_usr == NULL)) {
454                 CERROR("%s: can't allocate hash for accounting\n",
455                         osd->od_svname);
456                 GOTO(out, rc = -ENOMEM);
457         }
458
459         ac->zat_grp = cfs_hash_create("grp", 4, 4, 4, 0, 0, 0,
460                                       &acct_hash_operations,
461                                       ACCT_HASH_OPS);
462         if (unlikely(ac->zat_grp == NULL)) {
463                 CERROR("%s: can't allocate hash for accounting\n",
464                         osd->od_svname);
465                 GOTO(out, rc = -ENOMEM);
466         }
467
468         spin_lock(&osd->od_known_txg_lock);
469         if (oh->ot_tx->tx_txg != osd->od_known_txg) {
470                 osd->od_acct_delta = ac;
471                 osd->od_known_txg = oh->ot_tx->tx_txg;
472                 add_work = 1;
473         }
474         spin_unlock(&osd->od_known_txg_lock);
475
476         /* schedule a callback to be run in the context of txg
477          * once the latter is closed and syncing */
478         if (add_work) {
479                 spa_t *spa = dmu_objset_spa(osd->od_objset.os);
480                 LASSERT(ac->zat_osd == NULL);
481                 ac->zat_osd = osd;
482                 dsl_sync_task_nowait(spa_get_dsl(spa),
483                                         osd_zfs_acct_update,
484                                         ac, 128, oh->ot_tx);
485
486                 /* no to be freed now */
487                 ac = NULL;
488         }
489
490 out:
491         if (ac != NULL) {
492                 /* another thread has installed new structure already */
493                 if (ac->zat_usr)
494                         cfs_hash_putref(ac->zat_usr);
495                 if (ac->zat_grp)
496                         cfs_hash_putref(ac->zat_grp);
497                 OBD_FREE_PTR(ac);
498         }
499
500         return rc;
501 }
502
503 void osd_zfs_acct_uid(const struct lu_env *env, struct osd_device *osd,
504                       __u64 uid, int delta, struct osd_thandle *oh)
505 {
506         int rc;
507
508         /* add per-txg job to update accounting */
509         rc = osd_zfs_acct_trans_start(env, oh);
510         if (unlikely(rc != 0))
511                 return;
512
513         /* maintain per-OSD cached value */
514         osd_zfs_acct_cache_init(env, osd, osd->od_acct_usr,
515                                 osd->od_iusr_oid, uid, delta, oh);
516
517         /* maintain per-TXG delta */
518         osd_zfs_acct_id(env, osd->od_acct_delta->zat_usr, uid, delta, oh);
519
520 }
521
522 void osd_zfs_acct_gid(const struct lu_env *env, struct osd_device *osd,
523                       __u64 gid, int delta, struct osd_thandle *oh)
524 {
525         int rc;
526
527         /* add per-txg job to update accounting */
528         rc = osd_zfs_acct_trans_start(env, oh);
529         if (unlikely(rc != 0))
530                 return;
531
532         /* maintain per-OSD cached value */
533         osd_zfs_acct_cache_init(env, osd, osd->od_acct_grp,
534                                 osd->od_igrp_oid, gid, delta, oh);
535
536         /* maintain per-TXG delta */
537         osd_zfs_acct_id(env, osd->od_acct_delta->zat_grp, gid, delta, oh);
538 }
539
540 /**
541  * Space Accounting Management
542  */
543
544 /**
545  * Return space usage consumed by a given uid or gid.
546  * Block usage is accurrate since it is maintained by DMU itself.
547  * However, DMU does not provide inode accounting, so the #inodes in use
548  * is estimated from the block usage and statfs information.
549  *
550  * \param env   - is the environment passed by the caller
551  * \param dtobj - is the accounting object
552  * \param dtrec - is the record to fill with space usage information
553  * \param dtkey - is the id the of the user or group for which we would
554  *                like to access disk usage.
555  * \param capa - is the capability, not used.
556  *
557  * \retval +ve - success : exact match
558  * \retval -ve - failure
559  */
560 static int osd_acct_index_lookup(const struct lu_env *env,
561                                  struct dt_object *dtobj,
562                                  struct dt_rec *dtrec,
563                                  const struct dt_key *dtkey,
564                                  struct lustre_capa *capa)
565 {
566         struct osd_thread_info  *info = osd_oti_get(env);
567         char                    *buf  = info->oti_buf;
568         struct lquota_acct_rec  *rec  = (struct lquota_acct_rec *)dtrec;
569         struct osd_object       *obj = osd_dt_obj(dtobj);
570         struct osd_device       *osd = osd_obj2dev(obj);
571         uint64_t                 oid;
572         struct zfs_id_change    *za = NULL;
573         int                      rc;
574         ENTRY;
575
576         rec->bspace = rec->ispace = 0;
577
578         /* convert the 64-bit uid/gid into a string */
579         sprintf(buf, "%llx", *((__u64 *)dtkey));
580         /* fetch DMU object ID (DMU_USERUSED_OBJECT/DMU_GROUPUSED_OBJECT) to be
581          * used */
582         oid = osd_quota_fid2dmu(lu_object_fid(&dtobj->do_lu));
583
584         /* disk usage (in bytes) is maintained by DMU.
585          * DMU_USERUSED_OBJECT/DMU_GROUPUSED_OBJECT are special objects which
586          * not associated with any dmu_but_t (see dnode_special_open()).
587          * As a consequence, we cannot use udmu_zap_lookup() here since it
588          * requires a valid oo_db. */
589         rc = -zap_lookup(osd->od_objset.os, oid, buf, sizeof(uint64_t), 1,
590                         &rec->bspace);
591         if (rc == -ENOENT)
592                 /* user/group has not created anything yet */
593                 CDEBUG(D_QUOTA, "%s: id %s not found in DMU accounting ZAP\n",
594                        osd->od_svname, buf);
595         else if (rc)
596                 RETURN(rc);
597
598         if (osd->od_quota_iused_est) {
599                 if (rec->bspace != 0)
600                         /* estimate #inodes in use */
601                         rec->ispace = udmu_objset_user_iused(&osd->od_objset,
602                                                              rec->bspace);
603                 RETURN(+1);
604         }
605
606         /* as for inode accounting, it is not maintained by DMU, so we just
607          * use our own ZAP to track inode usage */
608         if (oid == DMU_USERUSED_OBJECT) {
609                 za = osd_zfs_lookup_by_id(osd->od_acct_usr,
610                                          *((__u64 *)dtkey));
611         } else if (oid == DMU_GROUPUSED_OBJECT) {
612                 za = osd_zfs_lookup_by_id(osd->od_acct_grp,
613                                          *((__u64 *)dtkey));
614         }
615         if (za) {
616                 rec->ispace = atomic_read(&za->zic_num);
617         } else {
618                 rc = -zap_lookup(osd->od_objset.os, obj->oo_db->db_object,
619                                 buf, sizeof(uint64_t), 1, &rec->ispace);
620         }
621
622         if (rc == -ENOENT)
623                 /* user/group has not created any file yet */
624                 CDEBUG(D_QUOTA, "%s: id %s not found in accounting ZAP\n",
625                        osd->od_svname, buf);
626         else if (rc)
627                 RETURN(rc);
628
629         RETURN(+1);
630 }
631
632 /**
633  * Initialize osd Iterator for given osd index object.
634  *
635  * \param  dt    - osd index object
636  * \param  attr  - not used
637  * \param  capa  - BYPASS_CAPA
638  */
639 static struct dt_it *osd_it_acct_init(const struct lu_env *env,
640                                       struct dt_object *dt,
641                                       __u32 attr,
642                                       struct lustre_capa *capa)
643 {
644         struct osd_thread_info  *info = osd_oti_get(env);
645         struct osd_it_quota     *it;
646         struct lu_object        *lo   = &dt->do_lu;
647         struct osd_device       *osd  = osd_dev(lo->lo_dev);
648         int                      rc;
649         ENTRY;
650
651         LASSERT(lu_object_exists(lo));
652
653         if (info == NULL)
654                 RETURN(ERR_PTR(-ENOMEM));
655
656         it = &info->oti_it_quota;
657         memset(it, 0, sizeof(*it));
658         it->oiq_oid = osd_quota_fid2dmu(lu_object_fid(lo));
659
660         if (it->oiq_oid == DMU_GROUPUSED_OBJECT)
661                 it->oiq_hash = osd->od_acct_grp;
662         else if (it->oiq_oid == DMU_USERUSED_OBJECT)
663                 it->oiq_hash = osd->od_acct_usr;
664         else
665                 LBUG();
666
667         /* initialize zap cursor */
668         rc = -udmu_zap_cursor_init(&it->oiq_zc, &osd->od_objset, it->oiq_oid,0);
669         if (rc)
670                 RETURN(ERR_PTR(rc));
671
672         /* take object reference */
673         lu_object_get(lo);
674         it->oiq_obj   = osd_dt_obj(dt);
675         it->oiq_reset = 1;
676
677         RETURN((struct dt_it *)it);
678 }
679
680 /**
681  * Free given iterator.
682  *
683  * \param  di   - osd iterator
684  */
685 static void osd_it_acct_fini(const struct lu_env *env, struct dt_it *di)
686 {
687         struct osd_it_quota *it = (struct osd_it_quota *)di;
688         ENTRY;
689         udmu_zap_cursor_fini(it->oiq_zc);
690         lu_object_put(env, &it->oiq_obj->oo_dt.do_lu);
691         EXIT;
692 }
693
694 /**
695  * Move on to the next valid entry.
696  *
697  * \param  di   - osd iterator
698  *
699  * \retval +ve  - iterator reached the end
700  * \retval   0  - iterator has not reached the end yet
701  * \retval -ve  - unexpected failure
702  */
703 static int osd_it_acct_next(const struct lu_env *env, struct dt_it *di)
704 {
705         struct osd_it_quota     *it = (struct osd_it_quota *)di;
706         int                      rc;
707         ENTRY;
708
709         if (it->oiq_reset == 0)
710                 zap_cursor_advance(it->oiq_zc);
711         it->oiq_reset = 0;
712         rc = -udmu_zap_cursor_retrieve_key(env, it->oiq_zc, NULL, 32);
713         if (rc == -ENOENT) /* reached the end */
714                 RETURN(+1);
715         RETURN(rc);
716 }
717
718 /**
719  * Return pointer to the key under iterator.
720  *
721  * \param  di   - osd iterator
722  */
723 static struct dt_key *osd_it_acct_key(const struct lu_env *env,
724                                       const struct dt_it *di)
725 {
726         struct osd_it_quota     *it = (struct osd_it_quota *)di;
727         struct osd_thread_info  *info = osd_oti_get(env);
728         char                    *buf  = info->oti_buf;
729         char                    *p;
730         int                      rc;
731         ENTRY;
732
733         it->oiq_reset = 0;
734         rc = -udmu_zap_cursor_retrieve_key(env, it->oiq_zc, buf, 32);
735         if (rc)
736                 RETURN(ERR_PTR(rc));
737         it->oiq_id = simple_strtoull(buf, &p, 16);
738         RETURN((struct dt_key *) &it->oiq_id);
739 }
740
741 /**
742  * Return size of key under iterator (in bytes)
743  *
744  * \param  di   - osd iterator
745  */
746 static int osd_it_acct_key_size(const struct lu_env *env,
747                                 const struct dt_it *di)
748 {
749         ENTRY;
750         RETURN((int)sizeof(uint64_t));
751 }
752
753 /**
754  * Return pointer to the record under iterator.
755  *
756  * \param  di    - osd iterator
757  * \param  attr  - not used
758  */
759 static int osd_it_acct_rec(const struct lu_env *env,
760                            const struct dt_it *di,
761                            struct dt_rec *dtrec, __u32 attr)
762 {
763         struct osd_thread_info  *info = osd_oti_get(env);
764         char                    *buf  = info->oti_buf;
765         struct osd_it_quota     *it = (struct osd_it_quota *)di;
766         struct lquota_acct_rec  *rec  = (struct lquota_acct_rec *)dtrec;
767         struct osd_object       *obj = it->oiq_obj;
768         struct osd_device       *osd = osd_obj2dev(obj);
769         int                      bytes_read;
770         struct zfs_id_change    *za;
771         int                      rc;
772         ENTRY;
773
774         it->oiq_reset = 0;
775         rec->ispace = rec->bspace = 0;
776
777         /* retrieve block usage from the DMU accounting object */
778         rc = -udmu_zap_cursor_retrieve_value(env, it->oiq_zc,
779                                              (char *)&rec->bspace,
780                                              sizeof(uint64_t), &bytes_read);
781         if (rc)
782                 RETURN(rc);
783
784         if (osd->od_quota_iused_est) {
785                 if (rec->bspace != 0)
786                         /* estimate #inodes in use */
787                         rec->ispace = udmu_objset_user_iused(&osd->od_objset,
788                                                              rec->bspace);
789                 RETURN(0);
790         }
791
792         /* retrieve key associated with the current cursor */
793         rc = -udmu_zap_cursor_retrieve_key(env, it->oiq_zc, buf, 32);
794         if (rc)
795                 RETURN(rc);
796
797         /* inode accounting is not maintained by DMU, so we use our own ZAP to
798          * track inode usage */
799         za = osd_zfs_lookup_by_id(it->oiq_hash, it->oiq_id);
800         if (za != NULL) {
801                 /* found in the cache */
802                 rec->ispace = atomic_read(&za->zic_num);
803         } else {
804                  rc = -zap_lookup(osd->od_objset.os,
805                                   it->oiq_obj->oo_db->db_object,
806                                   buf, sizeof(uint64_t), 1, &rec->ispace);
807                  if (rc == -ENOENT) {
808                         /* user/group has not created any file yet */
809                         CDEBUG(D_QUOTA, "%s: id %s not found in ZAP\n",
810                                osd->od_svname, buf);
811                         rc = 0;
812                 }
813         }
814
815         RETURN(rc);
816 }
817
818 /**
819  * Returns cookie for current Iterator position.
820  *
821  * \param  di    - osd iterator
822  */
823 static __u64 osd_it_acct_store(const struct lu_env *env,
824                                const struct dt_it *di)
825 {
826         struct osd_it_quota *it = (struct osd_it_quota *)di;
827         ENTRY;
828         it->oiq_reset = 0;
829         RETURN(udmu_zap_cursor_serialize(it->oiq_zc));
830 }
831
832 /**
833  * Restore iterator from cookie. if the \a hash isn't found,
834  * restore the first valid record.
835  *
836  * \param  di    - osd iterator
837  * \param  hash  - iterator location cookie
838  *
839  * \retval +ve  - di points to exact matched key
840  * \retval  0   - di points to the first valid record
841  * \retval -ve  - failure
842  */
843 static int osd_it_acct_load(const struct lu_env *env,
844                             const struct dt_it *di, __u64 hash)
845 {
846         struct osd_it_quota     *it  = (struct osd_it_quota *)di;
847         struct osd_device       *osd = osd_obj2dev(it->oiq_obj);
848         zap_cursor_t            *zc;
849         int                      rc;
850         ENTRY;
851
852         /* create new cursor pointing to the new hash */
853         rc = -udmu_zap_cursor_init(&zc, &osd->od_objset, it->oiq_oid, hash);
854         if (rc)
855                 RETURN(rc);
856         udmu_zap_cursor_fini(it->oiq_zc);
857         it->oiq_zc = zc;
858         it->oiq_reset = 0;
859
860         rc = -udmu_zap_cursor_retrieve_key(env, it->oiq_zc, NULL, 32);
861         if (rc == 0)
862                 RETURN(+1);
863         else if (rc == -ENOENT)
864                 RETURN(0);
865         RETURN(rc);
866 }
867
868 /**
869  * Move Iterator to record specified by \a key, if the \a key isn't found,
870  * move to the first valid record.
871  *
872  * \param  di   - osd iterator
873  * \param  key  - uid or gid
874  *
875  * \retval +ve  - di points to exact matched key
876  * \retval 0    - di points to the first valid record
877  * \retval -ve  - failure
878  */
879 static int osd_it_acct_get(const struct lu_env *env, struct dt_it *di,
880                 const struct dt_key *key)
881 {
882         ENTRY;
883
884         /* XXX: like osd_zap_it_get(), API is currently broken */
885         LASSERT(*((__u64 *)key) == 0);
886
887         RETURN(osd_it_acct_load(env, di, 0));
888 }
889
890 /**
891  * Release Iterator
892  *
893  * \param  di   - osd iterator
894  */
895 static void osd_it_acct_put(const struct lu_env *env, struct dt_it *di)
896 {
897 }
898
899 /**
900  * Index and Iterator operations for accounting objects
901  */
902 const struct dt_index_operations osd_acct_index_ops = {
903         .dio_lookup = osd_acct_index_lookup,
904         .dio_it     = {
905                 .init           = osd_it_acct_init,
906                 .fini           = osd_it_acct_fini,
907                 .get            = osd_it_acct_get,
908                 .put            = osd_it_acct_put,
909                 .next           = osd_it_acct_next,
910                 .key            = osd_it_acct_key,
911                 .key_size       = osd_it_acct_key_size,
912                 .rec            = osd_it_acct_rec,
913                 .store          = osd_it_acct_store,
914                 .load           = osd_it_acct_load
915         }
916 };
917
918 /**
919  * Quota Enforcement Management
920  */
921
922 /*
923  * Wrapper for qsd_op_begin().
924  *
925  * \param env    - the environment passed by the caller
926  * \param osd    - is the osd_device
927  * \param uid    - user id of the inode
928  * \param gid    - group id of the inode
929  * \param space  - how many blocks/inodes will be consumed/released
930  * \param oh     - osd transaction handle
931  * \param is_blk - block quota or inode quota?
932  * \param flags  - if the operation is write, return no user quota, no
933  *                  group quota, or sync commit flags to the caller
934  * \param force  - set to 1 when changes are performed by root user and thus
935  *                  can't failed with EDQUOT
936  *
937  * \retval 0      - success
938  * \retval -ve    - failure
939  */
940 int osd_declare_quota(const struct lu_env *env, struct osd_device *osd,
941                       qid_t uid, qid_t gid, long long space,
942                       struct osd_thandle *oh, bool is_blk, int *flags,
943                       bool force)
944 {
945         struct osd_thread_info  *info = osd_oti_get(env);
946         struct lquota_id_info   *qi = &info->oti_qi;
947         struct qsd_instance     *qsd = osd->od_quota_slave;
948         int                      rcu, rcg; /* user & group rc */
949         ENTRY;
950
951         if (unlikely(qsd == NULL))
952                 /* quota slave instance hasn't been allocated yet */
953                 RETURN(0);
954
955         /* let's start with user quota */
956         qi->lqi_id.qid_uid = uid;
957         qi->lqi_type       = USRQUOTA;
958         qi->lqi_space      = space;
959         qi->lqi_is_blk     = is_blk;
960         rcu = qsd_op_begin(env, qsd, &oh->ot_quota_trans, qi, flags);
961
962         if (force && (rcu == -EDQUOT || rcu == -EINPROGRESS))
963                 /* ignore EDQUOT & EINPROGRESS when changes are done by root */
964                 rcu = 0;
965
966         /* For non-fatal error, we want to continue to get the noquota flags
967          * for group id. This is only for commit write, which has @flags passed
968          * in. See osd_declare_write_commit().
969          * When force is set to true, we also want to proceed with the gid */
970         if (rcu && (rcu != -EDQUOT || flags == NULL))
971                 RETURN(rcu);
972
973         /* and now group quota */
974         qi->lqi_id.qid_gid = gid;
975         qi->lqi_type       = GRPQUOTA;
976         rcg = qsd_op_begin(env, qsd, &oh->ot_quota_trans, qi, flags);
977
978         if (force && (rcg == -EDQUOT || rcg == -EINPROGRESS))
979                 /* as before, ignore EDQUOT & EINPROGRESS for root */
980                 rcg = 0;
981
982         RETURN(rcu ? rcu : rcg);
983 }