Whamcloud - gitweb
830e9cf4a28da0b8dd3c203f554c3e17a0acceb5
[fs/lustre-release.git] / lustre / quota / lquota_entry.c
1 /* GPL HEADER START
2  *
3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4  *
5  * This program is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License version 2 only,
7  * as published by the Free Software Foundation.
8  *
9  * This program is distributed in the hope that it will be useful, but
10  * WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * General Public License version 2 for more details (a copy is included
13  * in the LICENSE file that accompanied this code).
14  *
15  * You should have received a copy of the GNU General Public License
16  * version 2 along with this program; if not, write to the
17  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18  * Boston, MA 021110-1307, USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2011, 2012, Intel, Inc.
24  * Use is subject to license terms.
25  *
26  * Author: Johann Lombardi <johann.lombardi@intel.com>
27  * Author: Niu    Yawei    <yawei.niu@intel.com>
28  */
29
30 #ifndef EXPORT_SYMTAB
31 # define EXPORT_SYMTAB
32 #endif
33
34 #define DEBUG_SUBSYSTEM S_LQUOTA
35
36 #include <linux/module.h>
37 #include <linux/slab.h>
38 #include <obd_class.h>
39 #include "lquota_internal.h"
40
41 static int hash_lqs_cur_bits = HASH_LQE_CUR_BITS;
42 CFS_MODULE_PARM(hash_lqs_cur_bits, "i", int, 0444,
43                 "the current bits of lqe hash");
44 cfs_mem_cache_t *lqe_kmem;
45
46 static unsigned lqe64_hash_hash(cfs_hash_t *hs, const void *key, unsigned mask)
47 {
48         return cfs_hash_u64_hash(*((__u64 *)key), mask);
49 }
50
51 static void *lqe64_hash_key(cfs_hlist_node_t *hnode)
52 {
53         struct lquota_entry *lqe;
54         lqe = cfs_hlist_entry(hnode, struct lquota_entry, lqe_hash);
55         return &lqe->lqe_id.qid_uid;
56 }
57
58 static int lqe64_hash_keycmp(const void *key, cfs_hlist_node_t *hnode)
59 {
60         struct lquota_entry *lqe;
61         lqe = cfs_hlist_entry(hnode, struct lquota_entry, lqe_hash);
62         return (lqe->lqe_id.qid_uid == *((__u64*)key));
63 }
64
65 static void *lqe_hash_object(cfs_hlist_node_t *hnode)
66 {
67         return cfs_hlist_entry(hnode, struct lquota_entry, lqe_hash);
68 }
69
70 static void lqe_hash_get(cfs_hash_t *hs, cfs_hlist_node_t *hnode)
71 {
72         struct lquota_entry *lqe;
73         lqe = cfs_hlist_entry(hnode, struct lquota_entry, lqe_hash);
74         lqe_getref(lqe);
75 }
76
77 static void lqe_hash_put_locked(cfs_hash_t *hs, cfs_hlist_node_t *hnode)
78 {
79         struct lquota_entry *lqe;
80         lqe = cfs_hlist_entry(hnode, struct lquota_entry, lqe_hash);
81         lqe_putref(lqe);
82 }
83
84 static void lqe_hash_exit(cfs_hash_t *hs, cfs_hlist_node_t *hnode)
85 {
86         CERROR("Should not have any item left!\n");
87 }
88
89 /* lqe hash methods for 64-bit uid/gid, new hash functions would have to be
90  * defined for per-directory quota relying on a 128-bit FID */
91 static cfs_hash_ops_t lqe64_hash_ops = {
92         .hs_hash       = lqe64_hash_hash,
93         .hs_key        = lqe64_hash_key,
94         .hs_keycmp     = lqe64_hash_keycmp,
95         .hs_object     = lqe_hash_object,
96         .hs_get        = lqe_hash_get,
97         .hs_put_locked = lqe_hash_put_locked,
98         .hs_exit       = lqe_hash_exit
99 };
100
101 /* Logging helper function */
102 void lquota_lqe_debug0(struct lquota_entry *lqe,
103                        struct libcfs_debug_msg_data *msgdata,
104                        const char *fmt, ...)
105 {
106         struct lquota_site *site = lqe->lqe_site;
107         va_list args;
108
109         LASSERT(site->lqs_ops->lqe_debug != NULL);
110
111         va_start(args, fmt);
112         site->lqs_ops->lqe_debug(lqe, site->lqs_parent, msgdata, fmt, args);
113         va_end(args);
114 }
115
116 struct lqe_iter_data {
117         unsigned long   lid_inuse;
118         unsigned long   lid_freed;
119         bool            lid_free_all;
120 };
121
122 static int lqe_iter_cb(cfs_hash_t *hs, cfs_hash_bd_t *bd,
123                        cfs_hlist_node_t *hnode, void *data)
124 {
125         struct lqe_iter_data *d = (struct lqe_iter_data *)data;
126         struct lquota_entry  *lqe;
127
128         lqe = cfs_hlist_entry(hnode, struct lquota_entry, lqe_hash);
129         LASSERT(atomic_read(&lqe->lqe_ref) > 0);
130
131         /* Only one reference held by hash table, and nobody else can
132          * grab the entry at this moment, it's safe to remove it from
133          * the hash and free it. */
134         if (atomic_read(&lqe->lqe_ref) == 1) {
135                 if (!lqe_is_master(lqe)) {
136                         LASSERT(lqe->lqe_pending_write == 0);
137                         LASSERT(lqe->lqe_pending_req == 0);
138                 }
139                 if (d->lid_free_all || lqe->lqe_enforced) {
140                         d->lid_freed++;
141                         cfs_hash_bd_del_locked(hs, bd, hnode);
142                         return 0;
143                 }
144         }
145         d->lid_inuse++;
146
147         if (d->lid_free_all)
148                 LQUOTA_ERROR(lqe, "Inuse quota entry");
149         return 0;
150 }
151
152 /**
153  * Cleanup the entries in the hashtable
154  *
155  * \param hash     - hash table which stores quota entries
156  * \param free_all - free all entries or only free the entries
157  *                   without quota enforce ?
158  */
159 static void lqe_cleanup(cfs_hash_t *hash, bool free_all)
160 {
161         struct lqe_iter_data    d;
162         int                     repeat = 0;
163         ENTRY;
164 retry:
165         memset(&d, 0, sizeof(d));
166         d.lid_free_all = free_all;
167
168         cfs_hash_for_each_safe(hash, lqe_iter_cb, &d);
169
170         /* In most case, when this function is called on master or
171          * slave finalization, there should be no inuse quota entry.
172          *
173          * If the per-fs quota updating thread is still holding
174          * some entries, we just wait for it's finished. */
175         if (free_all && d.lid_inuse) {
176                 CDEBUG(D_QUOTA, "Hash:%p has entries inuse: inuse:%lu, "
177                         "freed:%lu, repeat:%u\n", hash,
178                         d.lid_inuse, d.lid_freed, repeat);
179                 repeat++;
180                 cfs_schedule_timeout_and_set_state(CFS_TASK_INTERRUPTIBLE,
181                                                 cfs_time_seconds(1));
182                 goto retry;
183         }
184         EXIT;
185 }
186
187 /*
188  * Allocate a new lquota site.
189  *
190  * \param env    - the environment passed by the caller
191  * \param parent - is a pointer to the parent structure, either a qmt_pool_info
192  *                 structure on the master or a qsd_qtype_info structure on the
193  *                 slave.
194  * \param is_master - is set when the site belongs to a QMT.
195  * \param qtype     - is the quota type managed by this site
196  * \param ops       - is the quota entry operation vector to be used for quota
197  *                    entry belonging to this site.
198  *
199  * \retval 0     - success
200  * \retval -ve   - failure
201  */
202 struct lquota_site *lquota_site_alloc(const struct lu_env *env, void *parent,
203                                       bool is_master, short qtype,
204                                       struct lquota_entry_operations *ops)
205 {
206         struct lquota_site      *site;
207         char                     hashname[15];
208         ENTRY;
209
210         LASSERT(qtype < MAXQUOTAS);
211
212         OBD_ALLOC_PTR(site);
213         if (site == NULL)
214                 RETURN(ERR_PTR(-ENOMEM));
215
216         /* assign parameters */
217         site->lqs_qtype  = qtype;
218         site->lqs_parent = parent;
219         site->lqs_is_mst = is_master;
220         site->lqs_ops    = ops;
221
222         /* allocate hash table */
223         memset(hashname, 0, sizeof(hashname));
224         sprintf(hashname, "LQUOTA_HASH%u", qtype);
225         site->lqs_hash= cfs_hash_create(hashname, hash_lqs_cur_bits,
226                                         HASH_LQE_MAX_BITS,
227                                         min(hash_lqs_cur_bits,
228                                             HASH_LQE_BKT_BITS),
229                                         0, CFS_HASH_MIN_THETA,
230                                         CFS_HASH_MAX_THETA, &lqe64_hash_ops,
231                                         CFS_HASH_DEFAULT|CFS_HASH_BIGNAME);
232         if (site->lqs_hash == NULL) {
233                 OBD_FREE_PTR(site);
234                 RETURN(ERR_PTR(-ENOMEM));
235         }
236
237         RETURN(site);
238 }
239
240 /*
241  * Destroy a lquota site.
242  *
243  * \param env  - the environment passed by the caller
244  * \param site - lquota site to be destroyed
245  *
246  * \retval 0     - success
247  * \retval -ve   - failure
248  */
249 void lquota_site_free(const struct lu_env *env, struct lquota_site *site)
250 {
251         /* cleanup hash table */
252         lqe_cleanup(site->lqs_hash, true);
253         cfs_hash_putref(site->lqs_hash);
254
255         site->lqs_parent = NULL;
256         OBD_FREE_PTR(site);
257 }
258
259 /*
260  * Initialize qsd/qmt-specific fields of quota entry.
261  *
262  * \param lqe - is the quota entry to initialize
263  */
264 static void lqe_init(struct lquota_entry *lqe)
265 {
266         struct lquota_site *site;
267         ENTRY;
268
269         LASSERT(lqe != NULL);
270         site = lqe->lqe_site;
271         LASSERT(site != NULL);
272         LASSERT(site->lqs_ops->lqe_init != NULL);
273
274         LQUOTA_DEBUG(lqe, "init");
275
276         site->lqs_ops->lqe_init(lqe, site->lqs_parent);
277 }
278
279 /*
280  * Update a lquota entry. This is done by reading quota settings from the
281  * on-disk index. The lquota entry must be write locked.
282  *
283  * \param env - the environment passed by the caller
284  * \param lqe - is the quota entry to refresh
285  */
286 static int lqe_read(const struct lu_env *env, struct lquota_entry *lqe)
287 {
288         struct lquota_site      *site;
289         int                      rc;
290         ENTRY;
291
292         LASSERT(lqe != NULL);
293         site = lqe->lqe_site;
294         LASSERT(site != NULL);
295         LASSERT(site->lqs_ops->lqe_read != NULL);
296
297         LQUOTA_DEBUG(lqe, "read");
298
299         rc = site->lqs_ops->lqe_read(env, lqe, site->lqs_parent);
300         if (rc == 0)
301                 /* mark the entry as up-to-date */
302                 lqe->lqe_uptodate = true;
303
304         RETURN(rc);
305 }
306
307 /*
308  * Find or create a quota entry.
309  *
310  * \param env  - the environment passed by the caller
311  * \param site - lquota site which stores quota entries in a hash table
312  * \param qid  - is the quota ID to be found/created
313  *
314  * \retval 0     - success
315  * \retval -ve   - failure
316  */
317 struct lquota_entry *lqe_locate(const struct lu_env *env,
318                                 struct lquota_site *site, union lquota_id *qid)
319 {
320         struct lquota_entry     *lqe, *new = NULL;
321         int                      rc = 0;
322         ENTRY;
323
324         lqe = cfs_hash_lookup(site->lqs_hash, (void *)&qid->qid_uid);
325         if (lqe != NULL) {
326                 LASSERT(lqe->lqe_uptodate);
327                 RETURN(lqe);
328         }
329
330         OBD_SLAB_ALLOC_PTR_GFP(new, lqe_kmem, CFS_ALLOC_IO);
331         if (new == NULL) {
332                 CERROR("Fail to allocate lqe for id:"LPU64", "
333                         "hash:%s\n", qid->qid_uid, site->lqs_hash->hs_name);
334                 RETURN(ERR_PTR(-ENOMEM));
335         }
336
337         atomic_set(&new->lqe_ref, 1); /* hold 1 for caller */
338         new->lqe_id     = *qid;
339         new->lqe_site   = site;
340         CFS_INIT_LIST_HEAD(&new->lqe_link);
341
342         /* quota settings need to be updated from disk, that's why
343          * lqe->lqe_uptodate isn't set yet */
344         new->lqe_uptodate = false;
345
346         /* perform qmt/qsd specific initialization */
347         lqe_init(new);
348
349         /* read quota settings from disk and mark lqe as up-to-date */
350         rc = lqe_read(env, new);
351         if (rc)
352                 GOTO(out, lqe = ERR_PTR(rc));
353
354         /* add new entry to hash */
355         lqe = cfs_hash_findadd_unique(site->lqs_hash, &new->lqe_id.qid_uid,
356                                       &new->lqe_hash);
357         if (lqe == new)
358                 new = NULL;
359 out:
360         if (new)
361                 lqe_putref(new);
362         RETURN(lqe);
363 }