Whamcloud - gitweb
LU-9052 lod: accept lfs mkdir from old client
[fs/lustre-release.git] / lustre / quota / lquota_entry.c
1 /* GPL HEADER START
2  *
3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4  *
5  * This program is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License version 2 only,
7  * as published by the Free Software Foundation.
8  *
9  * This program is distributed in the hope that it will be useful, but
10  * WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * General Public License version 2 for more details (a copy is included
13  * in the LICENSE file that accompanied this code).
14  *
15  * You should have received a copy of the GNU General Public License
16  * version 2 along with this program; if not, write to the
17  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18  * Boston, MA 021110-1307, USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2012, 2015, Intel Corporation.
24  * Use is subject to license terms.
25  *
26  * Author: Johann Lombardi <johann.lombardi@intel.com>
27  * Author: Niu    Yawei    <yawei.niu@intel.com>
28  */
29
30 #define DEBUG_SUBSYSTEM S_LQUOTA
31
32 #include <linux/module.h>
33 #include <linux/slab.h>
34 #include <obd_class.h>
35 #include "lquota_internal.h"
36
37 static int hash_lqs_cur_bits = HASH_LQE_CUR_BITS;
38 module_param(hash_lqs_cur_bits, int, 0444);
39 MODULE_PARM_DESC(hash_lqs_cur_bits, "the current bits of lqe hash");
40
41 static unsigned
42 lqe64_hash_hash(struct cfs_hash *hs, const void *key, unsigned mask)
43 {
44         return cfs_hash_u64_hash(*((__u64 *)key), mask);
45 }
46
47 static void *lqe64_hash_key(struct hlist_node *hnode)
48 {
49         struct lquota_entry *lqe;
50         lqe = hlist_entry(hnode, struct lquota_entry, lqe_hash);
51         return &lqe->lqe_id.qid_uid;
52 }
53
54 static int lqe64_hash_keycmp(const void *key, struct hlist_node *hnode)
55 {
56         struct lquota_entry *lqe;
57         lqe = hlist_entry(hnode, struct lquota_entry, lqe_hash);
58         return (lqe->lqe_id.qid_uid == *((__u64*)key));
59 }
60
61 static void *lqe_hash_object(struct hlist_node *hnode)
62 {
63         return hlist_entry(hnode, struct lquota_entry, lqe_hash);
64 }
65
66 static void lqe_hash_get(struct cfs_hash *hs, struct hlist_node *hnode)
67 {
68         struct lquota_entry *lqe;
69         lqe = hlist_entry(hnode, struct lquota_entry, lqe_hash);
70         lqe_getref(lqe);
71 }
72
73 static void lqe_hash_put_locked(struct cfs_hash *hs, struct hlist_node *hnode)
74 {
75         struct lquota_entry *lqe;
76         lqe = hlist_entry(hnode, struct lquota_entry, lqe_hash);
77         lqe_putref(lqe);
78 }
79
80 static void lqe_hash_exit(struct cfs_hash *hs, struct hlist_node *hnode)
81 {
82         CERROR("Should not have any item left!\n");
83 }
84
85 /* lqe hash methods for 64-bit uid/gid, new hash functions would have to be
86  * defined for per-directory quota relying on a 128-bit FID */
87 static struct cfs_hash_ops lqe64_hash_ops = {
88         .hs_hash       = lqe64_hash_hash,
89         .hs_key        = lqe64_hash_key,
90         .hs_keycmp     = lqe64_hash_keycmp,
91         .hs_object     = lqe_hash_object,
92         .hs_get        = lqe_hash_get,
93         .hs_put_locked = lqe_hash_put_locked,
94         .hs_exit       = lqe_hash_exit
95 };
96
97 /* Logging helper function */
98 void lquota_lqe_debug0(struct lquota_entry *lqe,
99                        struct libcfs_debug_msg_data *msgdata,
100                        const char *fmt, ...)
101 {
102         struct lquota_site *site = lqe->lqe_site;
103         va_list args;
104
105         LASSERT(site->lqs_ops->lqe_debug != NULL);
106
107         va_start(args, fmt);
108         site->lqs_ops->lqe_debug(lqe, site->lqs_parent, msgdata, fmt, args);
109         va_end(args);
110 }
111
112 struct lqe_iter_data {
113         unsigned long   lid_inuse;
114         unsigned long   lid_freed;
115         bool            lid_free_all;
116 };
117
118 static int lqe_iter_cb(struct cfs_hash *hs, struct cfs_hash_bd *bd,
119                        struct hlist_node *hnode, void *data)
120 {
121         struct lqe_iter_data *d = (struct lqe_iter_data *)data;
122         struct lquota_entry  *lqe;
123
124         lqe = hlist_entry(hnode, struct lquota_entry, lqe_hash);
125         LASSERT(atomic_read(&lqe->lqe_ref) > 0);
126
127         /* Only one reference held by hash table, and nobody else can
128          * grab the entry at this moment, it's safe to remove it from
129          * the hash and free it. */
130         if (atomic_read(&lqe->lqe_ref) == 1) {
131                 if (!lqe_is_master(lqe)) {
132                         LASSERT(lqe->lqe_pending_write == 0);
133                         LASSERT(lqe->lqe_pending_req == 0);
134                 }
135                 if (d->lid_free_all || lqe->lqe_enforced) {
136                         d->lid_freed++;
137                         cfs_hash_bd_del_locked(hs, bd, hnode);
138                         return 0;
139                 }
140         }
141         d->lid_inuse++;
142
143         if (d->lid_free_all)
144                 LQUOTA_ERROR(lqe, "Inuse quota entry");
145         return 0;
146 }
147
148 /**
149  * Cleanup the entries in the hashtable
150  *
151  * \param hash     - hash table which stores quota entries
152  * \param free_all - free all entries or only free the entries
153  *                   without quota enforce ?
154  */
155 static void lqe_cleanup(struct cfs_hash *hash, bool free_all)
156 {
157         struct lqe_iter_data    d;
158         int                     repeat = 0;
159         ENTRY;
160 retry:
161         memset(&d, 0, sizeof(d));
162         d.lid_free_all = free_all;
163
164         cfs_hash_for_each_safe(hash, lqe_iter_cb, &d);
165
166         /* In most case, when this function is called on master or
167          * slave finalization, there should be no inuse quota entry.
168          *
169          * If the per-fs quota updating thread is still holding
170          * some entries, we just wait for it's finished. */
171         if (free_all && d.lid_inuse) {
172                 CDEBUG(D_QUOTA, "Hash:%p has entries inuse: inuse:%lu, "
173                         "freed:%lu, repeat:%u\n", hash,
174                         d.lid_inuse, d.lid_freed, repeat);
175                 repeat++;
176                 set_current_state(TASK_INTERRUPTIBLE);
177                 schedule_timeout(cfs_time_seconds(1));
178                 goto retry;
179         }
180         EXIT;
181 }
182
183 /*
184  * Allocate a new lquota site.
185  *
186  * \param env    - the environment passed by the caller
187  * \param parent - is a pointer to the parent structure, either a qmt_pool_info
188  *                 structure on the master or a qsd_qtype_info structure on the
189  *                 slave.
190  * \param is_master - is set when the site belongs to a QMT.
191  * \param qtype     - is the quota type managed by this site
192  * \param ops       - is the quota entry operation vector to be used for quota
193  *                    entry belonging to this site.
194  *
195  * \retval 0     - success
196  * \retval -ve   - failure
197  */
198 struct lquota_site *lquota_site_alloc(const struct lu_env *env, void *parent,
199                                       bool is_master, short qtype,
200                                       struct lquota_entry_operations *ops)
201 {
202         struct lquota_site      *site;
203         char                     hashname[15];
204         ENTRY;
205
206         if (qtype >= LL_MAXQUOTAS)
207                 RETURN(ERR_PTR(-ENOTSUPP));
208
209         OBD_ALLOC_PTR(site);
210         if (site == NULL)
211                 RETURN(ERR_PTR(-ENOMEM));
212
213         /* assign parameters */
214         site->lqs_qtype  = qtype;
215         site->lqs_parent = parent;
216         site->lqs_is_mst = is_master;
217         site->lqs_ops    = ops;
218
219         /* allocate hash table */
220         memset(hashname, 0, sizeof(hashname));
221         snprintf(hashname, sizeof(hashname), "LQUOTA_HASH%hu", qtype);
222         site->lqs_hash= cfs_hash_create(hashname, hash_lqs_cur_bits,
223                                         HASH_LQE_MAX_BITS,
224                                         min(hash_lqs_cur_bits,
225                                             HASH_LQE_BKT_BITS),
226                                         0, CFS_HASH_MIN_THETA,
227                                         CFS_HASH_MAX_THETA, &lqe64_hash_ops,
228                                         CFS_HASH_DEFAULT|CFS_HASH_BIGNAME);
229         if (site->lqs_hash == NULL) {
230                 OBD_FREE_PTR(site);
231                 RETURN(ERR_PTR(-ENOMEM));
232         }
233
234         RETURN(site);
235 }
236
237 /*
238  * Destroy a lquota site.
239  *
240  * \param env  - the environment passed by the caller
241  * \param site - lquota site to be destroyed
242  *
243  * \retval 0     - success
244  * \retval -ve   - failure
245  */
246 void lquota_site_free(const struct lu_env *env, struct lquota_site *site)
247 {
248         /* cleanup hash table */
249         lqe_cleanup(site->lqs_hash, true);
250         cfs_hash_putref(site->lqs_hash);
251
252         site->lqs_parent = NULL;
253         OBD_FREE_PTR(site);
254 }
255
256 /*
257  * Initialize qsd/qmt-specific fields of quota entry.
258  *
259  * \param lqe - is the quota entry to initialize
260  */
261 static void lqe_init(struct lquota_entry *lqe)
262 {
263         struct lquota_site *site;
264         ENTRY;
265
266         LASSERT(lqe != NULL);
267         site = lqe->lqe_site;
268         LASSERT(site != NULL);
269         LASSERT(site->lqs_ops->lqe_init != NULL);
270
271         LQUOTA_DEBUG(lqe, "init");
272
273         site->lqs_ops->lqe_init(lqe, site->lqs_parent);
274 }
275
276 /*
277  * Update a lquota entry. This is done by reading quota settings from the
278  * on-disk index. The lquota entry must be write locked.
279  *
280  * \param env - the environment passed by the caller
281  * \param lqe - is the quota entry to refresh
282  */
283 static int lqe_read(const struct lu_env *env, struct lquota_entry *lqe)
284 {
285         struct lquota_site      *site;
286         int                      rc;
287         ENTRY;
288
289         LASSERT(lqe != NULL);
290         site = lqe->lqe_site;
291         LASSERT(site != NULL);
292         LASSERT(site->lqs_ops->lqe_read != NULL);
293
294         LQUOTA_DEBUG(lqe, "read");
295
296         rc = site->lqs_ops->lqe_read(env, lqe, site->lqs_parent);
297         if (rc == 0)
298                 /* mark the entry as up-to-date */
299                 lqe->lqe_uptodate = true;
300
301         RETURN(rc);
302 }
303
304 /*
305  * Find or create a quota entry.
306  *
307  * \param env  - the environment passed by the caller
308  * \param site - lquota site which stores quota entries in a hash table
309  * \param qid  - is the quota ID to be found/created
310  *
311  * \retval 0     - success
312  * \retval -ve   - failure
313  */
314 struct lquota_entry *lqe_locate(const struct lu_env *env,
315                                 struct lquota_site *site, union lquota_id *qid)
316 {
317         struct lquota_entry     *lqe, *new = NULL;
318         int                      rc = 0;
319         ENTRY;
320
321         lqe = cfs_hash_lookup(site->lqs_hash, (void *)&qid->qid_uid);
322         if (lqe != NULL) {
323                 LASSERT(lqe->lqe_uptodate);
324                 RETURN(lqe);
325         }
326
327         OBD_SLAB_ALLOC_PTR_GFP(new, lqe_kmem, GFP_NOFS);
328         if (new == NULL) {
329                 CERROR("Fail to allocate lqe for id:%llu, "
330                         "hash:%s\n", qid->qid_uid, site->lqs_hash->hs_name);
331                 RETURN(ERR_PTR(-ENOMEM));
332         }
333
334         atomic_set(&new->lqe_ref, 1); /* hold 1 for caller */
335         new->lqe_id     = *qid;
336         new->lqe_site   = site;
337         INIT_LIST_HEAD(&new->lqe_link);
338
339         /* quota settings need to be updated from disk, that's why
340          * lqe->lqe_uptodate isn't set yet */
341         new->lqe_uptodate = false;
342
343         /* perform qmt/qsd specific initialization */
344         lqe_init(new);
345
346         /* read quota settings from disk and mark lqe as up-to-date */
347         rc = lqe_read(env, new);
348         if (rc)
349                 GOTO(out, lqe = ERR_PTR(rc));
350
351         /* add new entry to hash */
352         lqe = cfs_hash_findadd_unique(site->lqs_hash, &new->lqe_id.qid_uid,
353                                       &new->lqe_hash);
354         if (lqe == new)
355                 new = NULL;
356 out:
357         if (new)
358                 lqe_putref(new);
359         RETURN(lqe);
360 }