Whamcloud - gitweb
LU-1842 quota: add per-filesystem information
[fs/lustre-release.git] / lustre / quota / qsd_lib.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 021110-1307, USA
20  *
21  * GPL HEADER END
22  */
23 /*
24  * Copyright (c) 2012 Intel, Inc.
25  * Use is subject to license terms.
26  *
27  * Author: Johann Lombardi <johann.lombardi@intel.com>
28  * Author: Niu    Yawei    <yawei.niu@intel.com>
29  */
30
31 /*
32  * Quota Slave Driver (QSD) management.
33  *
34  * The quota slave feature is implemented under the form of a library called
35  * QSD. Each OSD device should create a QSD instance via qsd_init() which will
36  * be used to manage quota enforcement for this device. This implies:
37  * - completing the reintegration procedure with the quota master (aka QMT, see
38  *   qmt_dev.c) to retrieve the latest quota settings and space distribution.
39  * - managing quota locks in order to be notified of configuration changes.
40  * - acquiring space from the QMT when quota space for a given user/group is
41  *   close to exhaustion.
42  * - allocating quota space to service threads for local request processing.
43  *
44  * Once the QSD instance created, the OSD device should invoke qsd_start()
45  * when recovery is completed. This notifies the QSD that we are about to
46  * process new requests on which quota should be strictly enforced.
47  * Then, qsd_op_begin/end can be used to reserve/release/pre-acquire quota space
48  * for/after each operation until shutdown where the QSD instance should be
49  * freed via qsd_fini().
50  */
51
52 #ifndef EXPORT_SYMTAB
53 # define EXPORT_SYMTAB
54 #endif
55
56 #define DEBUG_SUBSYSTEM S_LQUOTA
57
58 #include <obd_class.h>
59 #include "qsd_internal.h"
60
61 /* define qsd thread key */
62 LU_KEY_INIT_FINI(qsd, struct qsd_thread_info);
63 LU_CONTEXT_KEY_DEFINE(qsd, LCT_MD_THREAD | LCT_DT_THREAD | LCT_LOCAL);
64 LU_KEY_INIT_GENERIC(qsd);
65
66 /* some procfs helpers */
67 static int lprocfs_qsd_rd_state(char *page, char **start, off_t off,
68                                 int count, int *eof, void *data)
69 {
70         struct qsd_instance     *qsd = (struct qsd_instance *)data;
71
72         LASSERT(qsd != NULL);
73
74         return snprintf(page, count,
75                         "target name:    %s\n"
76                         "pool ID:        %d\n"
77                         "type:           %s\n"
78                         "quota enabled:  none\n",
79                         qsd->qsd_svname, qsd->qsd_pool_id,
80                         qsd->qsd_is_md ? "md" : "dt");
81 }
82
83 static int lprocfs_qsd_rd_enabled(char *page, char **start, off_t off,
84                                   int count, int *eof, void *data)
85 {
86         struct qsd_instance     *qsd = (struct qsd_instance *)data;
87         char                     enabled[5];
88         LASSERT(qsd != NULL);
89
90         memset(enabled, 0, sizeof(enabled));
91         if (qsd_type_enabled(qsd, USRQUOTA))
92                 strcat(enabled, "u");
93         if (qsd_type_enabled(qsd, GRPQUOTA))
94                 strcat(enabled, "g");
95         if (strlen(enabled) == 0)
96                 strcat(enabled, "none");
97
98         return snprintf(page, count, "%s\n", enabled);
99 }
100
101 static struct lprocfs_vars lprocfs_quota_qsd_vars[] = {
102         { "info", lprocfs_qsd_rd_state, 0, 0},
103         { "enabled", lprocfs_qsd_rd_enabled, 0, 0},
104         { NULL }
105 };
106
107 /*
108  * Release qsd_qtype_info structure which contains data associated with a
109  * given quota type. This releases the accounting objects.
110  * It's called on OSD cleanup when the qsd instance is released.
111  *
112  * \param env - is the environment passed by the caller
113  * \param qsd - is the qsd instance managing the qsd_qtype_info structure
114  *              to be released
115  * \param qtype - is the quota type to be shutdown
116  */
117 static void qsd_qtype_fini(const struct lu_env *env, struct qsd_instance *qsd,
118                            int qtype)
119 {
120         struct qsd_qtype_info   *qqi;
121         ENTRY;
122
123         if (qsd->qsd_type_array[qtype] == NULL)
124                 RETURN_EXIT;
125         qqi = qsd->qsd_type_array[qtype];
126         qsd->qsd_type_array[qtype] = NULL;
127
128         /* by now, all qqi users should have gone away */
129         LASSERT(cfs_atomic_read(&qqi->qqi_ref) == 1);
130         lu_ref_fini(&qqi->qqi_reference);
131
132         /* release accounting object */
133         if (qqi->qqi_acct_obj != NULL && !IS_ERR(qqi->qqi_acct_obj)) {
134                 lu_object_put(env, &qqi->qqi_acct_obj->do_lu);
135                 qqi->qqi_acct_obj = NULL;
136         }
137
138         /* release slv index */
139         if (qqi->qqi_slv_obj != NULL && !IS_ERR(qqi->qqi_slv_obj)) {
140                 lu_object_put(env, &qqi->qqi_slv_obj->do_lu);
141                 qqi->qqi_slv_obj = NULL;
142                 qqi->qqi_slv_ver = 0;
143         }
144
145         /* release global index */
146         if (qqi->qqi_glb_obj != NULL && !IS_ERR(qqi->qqi_glb_obj)) {
147                 lu_object_put(env, &qqi->qqi_glb_obj->do_lu);
148                 qqi->qqi_glb_obj = NULL;
149                 qqi->qqi_glb_ver = 0;
150         }
151
152         OBD_FREE_PTR(qqi);
153         EXIT;
154 }
155
156 /*
157  * Allocate and initialize a qsd_qtype_info structure for quota type \qtype.
158  * This opens the accounting object and initializes the proc file.
159  * It's called on OSD start when the qsd_prepare() is invoked on the qsd
160  * instance.
161  *
162  * \param env  - the environment passed by the caller
163  * \param qsd  - is the qsd instance which will be in charge of the new
164  *               qsd_qtype_info instance.
165  * \param qtype - is quota type to set up
166  *
167  * \retval - 0 on success and qsd->qsd_type_array[qtype] is allocated,
168  *           appropriate error on failure
169  */
170 static int qsd_qtype_init(const struct lu_env *env, struct qsd_instance *qsd,
171                           int qtype)
172 {
173         struct qsd_qtype_info   *qqi;
174         int                      rc;
175         struct obd_uuid          uuid;
176         ENTRY;
177
178         LASSERT(qsd->qsd_type_array[qtype] == NULL);
179
180         /* allocate structure for this quota type */
181         OBD_ALLOC_PTR(qqi);
182         if (qqi == NULL)
183                 RETURN(-ENOMEM);
184         qsd->qsd_type_array[qtype] = qqi;
185         cfs_atomic_set(&qqi->qqi_ref, 1); /* referenced from qsd */
186
187         /* set backpointer and other parameters */
188         qqi->qqi_qsd   = qsd;
189         qqi->qqi_qtype = qtype;
190         lu_ref_init(&qqi->qqi_reference);
191         lquota_generate_fid(&qqi->qqi_fid, qsd->qsd_pool_id, QSD_RES_TYPE(qsd),
192                             qtype);
193         qqi->qqi_glb_uptodate = false;
194         qqi->qqi_slv_uptodate = false;
195         qqi->qqi_reint        = false;
196         memset(&qqi->qqi_lockh, 0, sizeof(qqi->qqi_lockh));
197
198         /* open accounting object */
199         LASSERT(qqi->qqi_acct_obj == NULL);
200         qqi->qqi_acct_obj = acct_obj_lookup(env, qsd->qsd_dev,
201                                             qtype == USRQUOTA ? ACCT_USER_OID
202                                                               : ACCT_GROUP_OID);
203         /* don't print any error message on failure in order not to confuse
204          * non-OFD user (e.g. 2.3 MDT stack) */
205         if (IS_ERR(qqi->qqi_acct_obj))
206                 qqi->qqi_acct_obj = NULL;
207
208         /* open global index copy */
209         LASSERT(qqi->qqi_glb_obj == NULL);
210         qqi->qqi_glb_obj = lquota_disk_glb_find_create(env, qsd->qsd_dev,
211                                                        qsd->qsd_root,
212                                                        &qqi->qqi_fid, true);
213         if (IS_ERR(qqi->qqi_glb_obj)) {
214                 CERROR("%s: can't open global index copy "DFID" %ld\n",
215                        qsd->qsd_svname, PFID(&qqi->qqi_fid),
216                        PTR_ERR(qqi->qqi_glb_obj));
217                 GOTO(out, rc = PTR_ERR(qqi->qqi_glb_obj));
218         }
219         qqi->qqi_glb_ver = dt_version_get(env, qqi->qqi_glb_obj);
220
221         /* open slave index copy */
222         LASSERT(qqi->qqi_slv_obj == NULL);
223         obd_str2uuid(&uuid, qsd->qsd_svname);
224         qqi->qqi_slv_obj = lquota_disk_slv_find_create(env, qsd->qsd_dev,
225                                                        qsd->qsd_root,
226                                                        &qqi->qqi_fid, &uuid,
227                                                        true);
228         if (IS_ERR(qqi->qqi_slv_obj)) {
229                 CERROR("%s: can't open slave index copy "DFID" %ld\n",
230                        qsd->qsd_svname, PFID(&qqi->qqi_fid),
231                        PTR_ERR(qqi->qqi_slv_obj));
232                 GOTO(out, rc = PTR_ERR(qqi->qqi_slv_obj));
233         }
234         qqi->qqi_slv_ver = dt_version_get(env, qqi->qqi_slv_obj);
235
236         /* register proc entry for accounting object */
237         rc = lprocfs_seq_create(qsd->qsd_proc,
238                                 qtype == USRQUOTA ? "acct_user" : "acct_group",
239                                 0444, &lprocfs_quota_seq_fops,
240                                 qqi->qqi_acct_obj);
241         if (rc) {
242                 CWARN("%s: can't add procfs entry for accounting file %d\n",
243                       qsd->qsd_svname, rc);
244                 GOTO(out, rc);
245         }
246
247         EXIT;
248 out:
249         if (rc)
250                 qsd_qtype_fini(env, qsd, qtype);
251         return rc;
252 }
253
254 /*
255  * Release a qsd_instance. Companion of qsd_init(). This releases all data
256  * structures associated with the quota slave.
257  * This function should be called when the OSD is shutting down.
258  *
259  * \param env - is the environment passed by the caller
260  * \param qsd - is the qsd instance to shutdown
261  */
262 void qsd_fini(const struct lu_env *env, struct qsd_instance *qsd)
263 {
264         int     qtype;
265         ENTRY;
266
267         CDEBUG(D_QUOTA, "%s: initiating QSD shutdown\n", qsd->qsd_svname);
268         qsd->qsd_stopping = true;
269
270         /* remove from the list of fsinfo */
271         if (!cfs_list_empty(&qsd->qsd_link)) {
272                 LASSERT(qsd->qsd_fsinfo != NULL);
273                 cfs_down(&qsd->qsd_fsinfo->qfs_sem);
274                 cfs_list_del_init(&qsd->qsd_link);
275                 cfs_up(&qsd->qsd_fsinfo->qfs_sem);
276         }
277
278         /* remove qsd proc entry */
279         if (qsd->qsd_proc != NULL) {
280                 lprocfs_remove(&qsd->qsd_proc);
281                 qsd->qsd_proc = NULL;
282         }
283
284         /* free per-quota type data */
285         for (qtype = USRQUOTA; qtype < MAXQUOTAS; qtype++)
286                 qsd_qtype_fini(env, qsd, qtype);
287
288         /* release per-filesystem information */
289         if (qsd->qsd_fsinfo != NULL)
290                 qsd_put_fsinfo(qsd->qsd_fsinfo);
291
292         /* release quota root directory */
293         if (qsd->qsd_root != NULL) {
294                 lu_object_put(env, &qsd->qsd_root->do_lu);
295                 qsd->qsd_root = NULL;
296         }
297
298         /* release reference on dt_device */
299         if (qsd->qsd_dev != NULL) {
300                 lu_ref_del(&qsd->qsd_dev->dd_lu_dev.ld_reference, "qsd", qsd);
301                 lu_device_put(&qsd->qsd_dev->dd_lu_dev);
302                 qsd->qsd_dev = NULL;
303         }
304
305         OBD_FREE_PTR(qsd);
306         EXIT;
307 }
308 EXPORT_SYMBOL(qsd_fini);
309
310 /*
311  * Create a new qsd_instance to be associated with backend osd device
312  * identified by \dev.
313  *
314  * \param env    - the environment passed by the caller
315  * \param svname - is the service name of the OSD device creating this instance
316  * \param dev    - is the dt_device where to store quota index files
317  * \param osd_proc - is the procfs parent directory where to create procfs file
318  *                   related to this new qsd instance
319  *
320  * \retval - pointer to new qsd_instance associated with dev \dev on success,
321  *           appropriate error on failure
322  */
323 struct qsd_instance *qsd_init(const struct lu_env *env, char *svname,
324                               struct dt_device *dev,
325                               cfs_proc_dir_entry_t *osd_proc)
326 {
327         struct qsd_thread_info  *qti = qsd_info(env);
328         struct qsd_instance     *qsd;
329         int                      rc;
330         ENTRY;
331
332         /* allocate qsd instance */
333         OBD_ALLOC_PTR(qsd);
334         if (qsd == NULL)
335                 RETURN(ERR_PTR(-ENOMEM));
336
337         cfs_rwlock_init(&qsd->qsd_lock);
338         CFS_INIT_LIST_HEAD(&qsd->qsd_link);
339         /* copy service name */
340         strncpy(qsd->qsd_svname, svname, MAX_OBD_NAME);
341
342         /* grab reference on osd device */
343         lu_device_get(&dev->dd_lu_dev);
344         lu_ref_add(&dev->dd_lu_dev.ld_reference, "qsd", qsd);
345         qsd->qsd_dev = dev;
346
347         /* we only support pool ID 0 (default data or metadata pool) for the
348          * time being. A different pool ID could be assigned to this target via
349          * the configuration log in the future */
350         qsd->qsd_pool_id  = 0;
351
352         /* get fsname from svname */
353         rc = server_name2fsname(svname, qti->qti_buf, NULL);
354         if (rc) {
355                 CERROR("%s: fail to extract filesystem name\n", svname);
356                 GOTO(out, rc);
357         }
358
359         /* look up quota setting for the filesystem the target belongs to */
360         qsd->qsd_fsinfo = qsd_get_fsinfo(qti->qti_buf, 1);
361         if (qsd->qsd_fsinfo == NULL) {
362                 CERROR("%s: failed to locate filesystem information\n", svname);
363                 GOTO(out, rc = -EINVAL);
364         }
365
366         /* add in the list of lquota_fsinfo */
367         cfs_down(&qsd->qsd_fsinfo->qfs_sem);
368         list_add_tail(&qsd->qsd_link, &qsd->qsd_fsinfo->qfs_qsd_list);
369         cfs_up(&qsd->qsd_fsinfo->qfs_sem);
370
371         /* register procfs directory */
372         qsd->qsd_proc = lprocfs_register(QSD_DIR, osd_proc,
373                                          lprocfs_quota_qsd_vars, qsd);
374         if (IS_ERR(qsd->qsd_proc)) {
375                 rc = PTR_ERR(qsd->qsd_proc);
376                 qsd->qsd_proc = NULL;
377                 CERROR("%s: fail to create quota slave proc entry (%d)\n",
378                        svname, rc);
379                 GOTO(out, rc);
380         }
381 out:
382         if (rc) {
383                 qsd_fini(env, qsd);
384                 return ERR_PTR(rc);
385         }
386         RETURN(qsd);
387 }
388 EXPORT_SYMBOL(qsd_init);
389
390 /*
391  * Initialize on-disk structures in order to manage quota enforcement for
392  * the target associated with the qsd instance \qsd and starts the reintegration
393  * procedure for each quota type as soon as possible.
394  * The last step of the reintegration will be completed once qsd_start() is
395  * called, at which points the space reconciliation with the master will be
396  * executed.
397  * This function must be called when the server stack is fully configured,
398  * typically when ->ldo_prepare is called across the stack.
399  *
400  * \param env - the environment passed by the caller
401  * \param qsd - is qsd_instance to prepare
402  *
403  * \retval - 0 on success, appropriate error on failure
404  */
405 int qsd_prepare(const struct lu_env *env, struct qsd_instance *qsd)
406 {
407         int     rc, qtype;
408         ENTRY;
409
410         LASSERT(qsd != NULL);
411
412         /* Record whether this qsd instance is managing quota enforcement for a
413          * MDT (i.e. inode quota) or OST (block quota) */
414         if (lu_device_is_md(qsd->qsd_dev->dd_lu_dev.ld_site->ls_top_dev))
415                 qsd->qsd_is_md = true;
416
417         /* look-up on-disk directory for the quota slave */
418         qsd->qsd_root = lquota_disk_dir_find_create(env, qsd->qsd_dev, NULL,
419                                                     QSD_DIR);
420         if (IS_ERR(qsd->qsd_root)) {
421                 rc = PTR_ERR(qsd->qsd_root);
422                 qsd->qsd_root = NULL;
423                 CERROR("%s: failed to create quota slave root dir (%d)\n",
424                        qsd->qsd_svname, rc);
425                 RETURN(rc);
426         }
427
428         /* initialize per-quota type data */
429         for (qtype = USRQUOTA; qtype < MAXQUOTAS; qtype++) {
430                 rc = qsd_qtype_init(env, qsd, qtype);
431                 if (rc)
432                         RETURN(rc);
433         }
434
435         RETURN(0);
436 }
437 EXPORT_SYMBOL(qsd_prepare);
438
439 void lustre_register_quota_process_config(int (*qpc)(struct lustre_cfg *lcfg));
440
441 /*
442  * Global initialization performed at module load time
443  */
444 int qsd_glb_init(void)
445 {
446         qsd_key_init_generic(&qsd_thread_key, NULL);
447         lu_context_key_register(&qsd_thread_key);
448         lustre_register_quota_process_config(qsd_process_config);
449         return 0;
450 }
451
452 /*
453  * Companion of qsd_glb_init() called at module unload time
454  */
455 void qsd_glb_fini(void)
456 {
457         lustre_register_quota_process_config(NULL);
458         lu_context_key_degister(&qsd_thread_key);
459 }