Whamcloud - gitweb
16f703da866b571f7228908431ae290f529bb59a
[fs/lustre-release.git] / lustre / quota / lquota_entry.c
1 /* GPL HEADER START
2  *
3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4  *
5  * This program is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License version 2 only,
7  * as published by the Free Software Foundation.
8  *
9  * This program is distributed in the hope that it will be useful, but
10  * WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * General Public License version 2 for more details (a copy is included
13  * in the LICENSE file that accompanied this code).
14  *
15  * You should have received a copy of the GNU General Public License
16  * version 2 along with this program; if not, write to the
17  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18  * Boston, MA 021110-1307, USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2012, Intel Corporation.
24  * Use is subject to license terms.
25  *
26  * Author: Johann Lombardi <johann.lombardi@intel.com>
27  * Author: Niu    Yawei    <yawei.niu@intel.com>
28  */
29
30 #define DEBUG_SUBSYSTEM S_LQUOTA
31
32 #include <linux/module.h>
33 #include <linux/slab.h>
34 #include <obd_class.h>
35 #include "lquota_internal.h"
36
37 static int hash_lqs_cur_bits = HASH_LQE_CUR_BITS;
38 CFS_MODULE_PARM(hash_lqs_cur_bits, "i", int, 0444,
39                 "the current bits of lqe hash");
40
41 static unsigned lqe64_hash_hash(cfs_hash_t *hs, const void *key, unsigned mask)
42 {
43         return cfs_hash_u64_hash(*((__u64 *)key), mask);
44 }
45
46 static void *lqe64_hash_key(cfs_hlist_node_t *hnode)
47 {
48         struct lquota_entry *lqe;
49         lqe = cfs_hlist_entry(hnode, struct lquota_entry, lqe_hash);
50         return &lqe->lqe_id.qid_uid;
51 }
52
53 static int lqe64_hash_keycmp(const void *key, cfs_hlist_node_t *hnode)
54 {
55         struct lquota_entry *lqe;
56         lqe = cfs_hlist_entry(hnode, struct lquota_entry, lqe_hash);
57         return (lqe->lqe_id.qid_uid == *((__u64*)key));
58 }
59
60 static void *lqe_hash_object(cfs_hlist_node_t *hnode)
61 {
62         return cfs_hlist_entry(hnode, struct lquota_entry, lqe_hash);
63 }
64
65 static void lqe_hash_get(cfs_hash_t *hs, cfs_hlist_node_t *hnode)
66 {
67         struct lquota_entry *lqe;
68         lqe = cfs_hlist_entry(hnode, struct lquota_entry, lqe_hash);
69         lqe_getref(lqe);
70 }
71
72 static void lqe_hash_put_locked(cfs_hash_t *hs, cfs_hlist_node_t *hnode)
73 {
74         struct lquota_entry *lqe;
75         lqe = cfs_hlist_entry(hnode, struct lquota_entry, lqe_hash);
76         lqe_putref(lqe);
77 }
78
79 static void lqe_hash_exit(cfs_hash_t *hs, cfs_hlist_node_t *hnode)
80 {
81         CERROR("Should not have any item left!\n");
82 }
83
84 /* lqe hash methods for 64-bit uid/gid, new hash functions would have to be
85  * defined for per-directory quota relying on a 128-bit FID */
86 static cfs_hash_ops_t lqe64_hash_ops = {
87         .hs_hash       = lqe64_hash_hash,
88         .hs_key        = lqe64_hash_key,
89         .hs_keycmp     = lqe64_hash_keycmp,
90         .hs_object     = lqe_hash_object,
91         .hs_get        = lqe_hash_get,
92         .hs_put_locked = lqe_hash_put_locked,
93         .hs_exit       = lqe_hash_exit
94 };
95
96 /* Logging helper function */
97 void lquota_lqe_debug0(struct lquota_entry *lqe,
98                        struct libcfs_debug_msg_data *msgdata,
99                        const char *fmt, ...)
100 {
101         struct lquota_site *site = lqe->lqe_site;
102         va_list args;
103
104         LASSERT(site->lqs_ops->lqe_debug != NULL);
105
106         va_start(args, fmt);
107         site->lqs_ops->lqe_debug(lqe, site->lqs_parent, msgdata, fmt, args);
108         va_end(args);
109 }
110
111 struct lqe_iter_data {
112         unsigned long   lid_inuse;
113         unsigned long   lid_freed;
114         bool            lid_free_all;
115 };
116
117 static int lqe_iter_cb(cfs_hash_t *hs, cfs_hash_bd_t *bd,
118                        cfs_hlist_node_t *hnode, void *data)
119 {
120         struct lqe_iter_data *d = (struct lqe_iter_data *)data;
121         struct lquota_entry  *lqe;
122
123         lqe = cfs_hlist_entry(hnode, struct lquota_entry, lqe_hash);
124         LASSERT(atomic_read(&lqe->lqe_ref) > 0);
125
126         /* Only one reference held by hash table, and nobody else can
127          * grab the entry at this moment, it's safe to remove it from
128          * the hash and free it. */
129         if (atomic_read(&lqe->lqe_ref) == 1) {
130                 if (!lqe_is_master(lqe)) {
131                         LASSERT(lqe->lqe_pending_write == 0);
132                         LASSERT(lqe->lqe_pending_req == 0);
133                 }
134                 if (d->lid_free_all || lqe->lqe_enforced) {
135                         d->lid_freed++;
136                         cfs_hash_bd_del_locked(hs, bd, hnode);
137                         return 0;
138                 }
139         }
140         d->lid_inuse++;
141
142         if (d->lid_free_all)
143                 LQUOTA_ERROR(lqe, "Inuse quota entry");
144         return 0;
145 }
146
147 /**
148  * Cleanup the entries in the hashtable
149  *
150  * \param hash     - hash table which stores quota entries
151  * \param free_all - free all entries or only free the entries
152  *                   without quota enforce ?
153  */
154 static void lqe_cleanup(cfs_hash_t *hash, bool free_all)
155 {
156         struct lqe_iter_data    d;
157         int                     repeat = 0;
158         ENTRY;
159 retry:
160         memset(&d, 0, sizeof(d));
161         d.lid_free_all = free_all;
162
163         cfs_hash_for_each_safe(hash, lqe_iter_cb, &d);
164
165         /* In most case, when this function is called on master or
166          * slave finalization, there should be no inuse quota entry.
167          *
168          * If the per-fs quota updating thread is still holding
169          * some entries, we just wait for it's finished. */
170         if (free_all && d.lid_inuse) {
171                 CDEBUG(D_QUOTA, "Hash:%p has entries inuse: inuse:%lu, "
172                         "freed:%lu, repeat:%u\n", hash,
173                         d.lid_inuse, d.lid_freed, repeat);
174                 repeat++;
175                 schedule_timeout_and_set_state(TASK_INTERRUPTIBLE,
176                                                 cfs_time_seconds(1));
177                 goto retry;
178         }
179         EXIT;
180 }
181
182 /*
183  * Allocate a new lquota site.
184  *
185  * \param env    - the environment passed by the caller
186  * \param parent - is a pointer to the parent structure, either a qmt_pool_info
187  *                 structure on the master or a qsd_qtype_info structure on the
188  *                 slave.
189  * \param is_master - is set when the site belongs to a QMT.
190  * \param qtype     - is the quota type managed by this site
191  * \param ops       - is the quota entry operation vector to be used for quota
192  *                    entry belonging to this site.
193  *
194  * \retval 0     - success
195  * \retval -ve   - failure
196  */
197 struct lquota_site *lquota_site_alloc(const struct lu_env *env, void *parent,
198                                       bool is_master, short qtype,
199                                       struct lquota_entry_operations *ops)
200 {
201         struct lquota_site      *site;
202         char                     hashname[15];
203         ENTRY;
204
205         LASSERT(qtype < MAXQUOTAS);
206
207         OBD_ALLOC_PTR(site);
208         if (site == NULL)
209                 RETURN(ERR_PTR(-ENOMEM));
210
211         /* assign parameters */
212         site->lqs_qtype  = qtype;
213         site->lqs_parent = parent;
214         site->lqs_is_mst = is_master;
215         site->lqs_ops    = ops;
216
217         /* allocate hash table */
218         memset(hashname, 0, sizeof(hashname));
219         sprintf(hashname, "LQUOTA_HASH%u", qtype);
220         site->lqs_hash= cfs_hash_create(hashname, hash_lqs_cur_bits,
221                                         HASH_LQE_MAX_BITS,
222                                         min(hash_lqs_cur_bits,
223                                             HASH_LQE_BKT_BITS),
224                                         0, CFS_HASH_MIN_THETA,
225                                         CFS_HASH_MAX_THETA, &lqe64_hash_ops,
226                                         CFS_HASH_DEFAULT|CFS_HASH_BIGNAME);
227         if (site->lqs_hash == NULL) {
228                 OBD_FREE_PTR(site);
229                 RETURN(ERR_PTR(-ENOMEM));
230         }
231
232         RETURN(site);
233 }
234
235 /*
236  * Destroy a lquota site.
237  *
238  * \param env  - the environment passed by the caller
239  * \param site - lquota site to be destroyed
240  *
241  * \retval 0     - success
242  * \retval -ve   - failure
243  */
244 void lquota_site_free(const struct lu_env *env, struct lquota_site *site)
245 {
246         /* cleanup hash table */
247         lqe_cleanup(site->lqs_hash, true);
248         cfs_hash_putref(site->lqs_hash);
249
250         site->lqs_parent = NULL;
251         OBD_FREE_PTR(site);
252 }
253
254 /*
255  * Initialize qsd/qmt-specific fields of quota entry.
256  *
257  * \param lqe - is the quota entry to initialize
258  */
259 static void lqe_init(struct lquota_entry *lqe)
260 {
261         struct lquota_site *site;
262         ENTRY;
263
264         LASSERT(lqe != NULL);
265         site = lqe->lqe_site;
266         LASSERT(site != NULL);
267         LASSERT(site->lqs_ops->lqe_init != NULL);
268
269         LQUOTA_DEBUG(lqe, "init");
270
271         site->lqs_ops->lqe_init(lqe, site->lqs_parent);
272 }
273
274 /*
275  * Update a lquota entry. This is done by reading quota settings from the
276  * on-disk index. The lquota entry must be write locked.
277  *
278  * \param env - the environment passed by the caller
279  * \param lqe - is the quota entry to refresh
280  */
281 static int lqe_read(const struct lu_env *env, struct lquota_entry *lqe)
282 {
283         struct lquota_site      *site;
284         int                      rc;
285         ENTRY;
286
287         LASSERT(lqe != NULL);
288         site = lqe->lqe_site;
289         LASSERT(site != NULL);
290         LASSERT(site->lqs_ops->lqe_read != NULL);
291
292         LQUOTA_DEBUG(lqe, "read");
293
294         rc = site->lqs_ops->lqe_read(env, lqe, site->lqs_parent);
295         if (rc == 0)
296                 /* mark the entry as up-to-date */
297                 lqe->lqe_uptodate = true;
298
299         RETURN(rc);
300 }
301
302 /*
303  * Find or create a quota entry.
304  *
305  * \param env  - the environment passed by the caller
306  * \param site - lquota site which stores quota entries in a hash table
307  * \param qid  - is the quota ID to be found/created
308  *
309  * \retval 0     - success
310  * \retval -ve   - failure
311  */
312 struct lquota_entry *lqe_locate(const struct lu_env *env,
313                                 struct lquota_site *site, union lquota_id *qid)
314 {
315         struct lquota_entry     *lqe, *new = NULL;
316         int                      rc = 0;
317         ENTRY;
318
319         lqe = cfs_hash_lookup(site->lqs_hash, (void *)&qid->qid_uid);
320         if (lqe != NULL) {
321                 LASSERT(lqe->lqe_uptodate);
322                 RETURN(lqe);
323         }
324
325         OBD_SLAB_ALLOC_PTR_GFP(new, lqe_kmem, __GFP_IO);
326         if (new == NULL) {
327                 CERROR("Fail to allocate lqe for id:"LPU64", "
328                         "hash:%s\n", qid->qid_uid, site->lqs_hash->hs_name);
329                 RETURN(ERR_PTR(-ENOMEM));
330         }
331
332         atomic_set(&new->lqe_ref, 1); /* hold 1 for caller */
333         new->lqe_id     = *qid;
334         new->lqe_site   = site;
335         CFS_INIT_LIST_HEAD(&new->lqe_link);
336
337         /* quota settings need to be updated from disk, that's why
338          * lqe->lqe_uptodate isn't set yet */
339         new->lqe_uptodate = false;
340
341         /* perform qmt/qsd specific initialization */
342         lqe_init(new);
343
344         /* read quota settings from disk and mark lqe as up-to-date */
345         rc = lqe_read(env, new);
346         if (rc)
347                 GOTO(out, lqe = ERR_PTR(rc));
348
349         /* add new entry to hash */
350         lqe = cfs_hash_findadd_unique(site->lqs_hash, &new->lqe_id.qid_uid,
351                                       &new->lqe_hash);
352         if (lqe == new)
353                 new = NULL;
354 out:
355         if (new)
356                 lqe_putref(new);
357         RETURN(lqe);
358 }