Whamcloud - gitweb
LU-1842 quota: qsd lock
[fs/lustre-release.git] / lustre / quota / qsd_lib.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 021110-1307, USA
20  *
21  * GPL HEADER END
22  */
23 /*
24  * Copyright (c) 2012 Whamcloud, Inc.
25  * Use is subject to license terms.
26  *
27  * Author: Johann Lombardi <johann@whamcloud.com>
28  * Author: Niu    Yawei    <niu@whamcloud.com>
29  */
30
31 /*
32  * Quota Slave Driver (QSD) management.
33  */
34
35 #ifndef EXPORT_SYMTAB
36 # define EXPORT_SYMTAB
37 #endif
38
39 #define DEBUG_SUBSYSTEM S_LQUOTA
40
41 #include "qsd_internal.h"
42
43 /* define qsd thread key */
44 LU_KEY_INIT_FINI(qsd, struct qsd_thread_info);
45 LU_CONTEXT_KEY_DEFINE(qsd, LCT_MD_THREAD | LCT_DT_THREAD | LCT_LOCAL);
46 LU_KEY_INIT_GENERIC(qsd);
47
48 /* some procfs helpers */
49 static int lprocfs_qsd_rd_state(char *page, char **start, off_t off,
50                                 int count, int *eof, void *data)
51 {
52         struct qsd_instance     *qsd = (struct qsd_instance *)data;
53
54         LASSERT(qsd != NULL);
55
56         return snprintf(page, count,
57                         "target name:    %s\n"
58                         "pool ID:        %d\n"
59                         "type:           %s\n"
60                         "quota enabled:  none\n",
61                         qsd->qsd_svname, qsd->qsd_pool_id,
62                         qsd->qsd_is_md ? "md" : "dt");
63 }
64
65 static struct lprocfs_vars lprocfs_quota_qsd_vars[] = {
66         { "info", lprocfs_qsd_rd_state, 0, 0},
67         { NULL }
68 };
69
70 /*
71  * Release qsd_qtype_info structure which contains data associated with a
72  * given quota type. This releases the accounting objects.
73  * It's called on OSD cleanup when the qsd instance is released.
74  *
75  * \param env - is the environment passed by the caller
76  * \param qsd - is the qsd instance managing the qsd_qtype_info structure
77  *              to be released
78  * \param qtype - is the quota type to be shutdown
79  */
80 static void qsd_qtype_fini(const struct lu_env *env, struct qsd_instance *qsd,
81                            int qtype)
82 {
83         struct qsd_qtype_info   *qqi;
84         ENTRY;
85
86         if (qsd->qsd_type_array[qtype] == NULL)
87                 RETURN_EXIT;
88         qqi = qsd->qsd_type_array[qtype];
89         qsd->qsd_type_array[qtype] = NULL;
90
91         /* by now, all qqi users should have gone away */
92         LASSERT(cfs_atomic_read(&qqi->qqi_ref) == 1);
93         lu_ref_fini(&qqi->qqi_reference);
94
95         /* release accounting object */
96         if (qqi->qqi_acct_obj != NULL && !IS_ERR(qqi->qqi_acct_obj)) {
97                 lu_object_put(env, &qqi->qqi_acct_obj->do_lu);
98                 qqi->qqi_acct_obj = NULL;
99         }
100
101         /* release slv index */
102         if (qqi->qqi_slv_obj != NULL && !IS_ERR(qqi->qqi_slv_obj)) {
103                 lu_object_put(env, &qqi->qqi_slv_obj->do_lu);
104                 qqi->qqi_slv_obj = NULL;
105                 qqi->qqi_slv_ver = 0;
106         }
107
108         /* release global index */
109         if (qqi->qqi_glb_obj != NULL && !IS_ERR(qqi->qqi_glb_obj)) {
110                 lu_object_put(env, &qqi->qqi_glb_obj->do_lu);
111                 qqi->qqi_glb_obj = NULL;
112                 qqi->qqi_glb_ver = 0;
113         }
114
115         OBD_FREE_PTR(qqi);
116         EXIT;
117 }
118
119 /*
120  * Allocate and initialize a qsd_qtype_info structure for quota type \qtype.
121  * This opens the accounting object and initializes the proc file.
122  * It's called on OSD start when the qsd instance is created.
123  *
124  * \param env  - the environment passed by the caller
125  * \param qsd  - is the qsd instance which will be in charge of the new
126  *               qsd_qtype_info instance.
127  * \param qtype - is quota type to set up
128  *
129  * \retval - 0 on success and qsd->qsd_type_array[qtype] is allocated,
130  *           appropriate error on failure
131  */
132 static int qsd_qtype_init(const struct lu_env *env, struct qsd_instance *qsd,
133                           int qtype)
134 {
135         struct qsd_qtype_info   *qqi;
136         int                      rc;
137         struct obd_uuid          uuid;
138         ENTRY;
139
140         LASSERT(qsd->qsd_type_array[qtype] == NULL);
141
142         /* allocate structure for this quota type */
143         OBD_ALLOC_PTR(qqi);
144         if (qqi == NULL)
145                 RETURN(-ENOMEM);
146         qsd->qsd_type_array[qtype] = qqi;
147         cfs_atomic_set(&qqi->qqi_ref, 1); /* referenced from qsd */
148
149         /* set backpointer and other parameters */
150         qqi->qqi_qsd   = qsd;
151         qqi->qqi_qtype = qtype;
152         lu_ref_init(&qqi->qqi_reference);
153         lquota_generate_fid(&qqi->qqi_fid, qsd->qsd_pool_id, QSD_RES_TYPE(qsd),
154                             qtype);
155         qqi->qqi_glb_uptodate = false;
156         qqi->qqi_slv_uptodate = false;
157         qqi->qqi_reint        = false;
158         memset(&qqi->qqi_lockh, 0, sizeof(qqi->qqi_lockh));
159
160         /* open accounting object */
161         LASSERT(qqi->qqi_acct_obj == NULL);
162         qqi->qqi_acct_obj = acct_obj_lookup(env, qsd->qsd_dev,
163                                             qtype == USRQUOTA ? ACCT_USER_OID
164                                                               : ACCT_GROUP_OID);
165         /* don't print any error message on failure in order not to confuse
166          * non-OFD user (e.g. 2.3 MDT stack) */
167         if (IS_ERR(qqi->qqi_acct_obj))
168                 qqi->qqi_acct_obj = NULL;
169
170         /* open global index copy */
171         LASSERT(qqi->qqi_glb_obj == NULL);
172         qqi->qqi_glb_obj = lquota_disk_glb_find_create(env, qsd->qsd_dev,
173                                                        qsd->qsd_root,
174                                                        &qqi->qqi_fid, true);
175         if (IS_ERR(qqi->qqi_glb_obj)) {
176                 CERROR("%s: can't open global index copy "DFID" %ld\n",
177                        qsd->qsd_svname, PFID(&qqi->qqi_fid),
178                        PTR_ERR(qqi->qqi_glb_obj));
179                 GOTO(out, rc = PTR_ERR(qqi->qqi_glb_obj));
180         }
181         qqi->qqi_glb_ver = dt_version_get(env, qqi->qqi_glb_obj);
182
183         /* open slave index copy */
184         LASSERT(qqi->qqi_slv_obj == NULL);
185         obd_str2uuid(&uuid, qsd->qsd_svname);
186         qqi->qqi_slv_obj = lquota_disk_slv_find_create(env, qsd->qsd_dev,
187                                                        qsd->qsd_root,
188                                                        &qqi->qqi_fid, &uuid,
189                                                        true);
190         if (IS_ERR(qqi->qqi_slv_obj)) {
191                 CERROR("%s: can't open slave index copy "DFID" %ld\n",
192                        qsd->qsd_svname, PFID(&qqi->qqi_fid),
193                        PTR_ERR(qqi->qqi_slv_obj));
194                 GOTO(out, rc = PTR_ERR(qqi->qqi_slv_obj));
195         }
196         qqi->qqi_slv_ver = dt_version_get(env, qqi->qqi_slv_obj);
197
198         /* register proc entry for accounting object */
199         rc = lprocfs_seq_create(qsd->qsd_proc,
200                                 qtype == USRQUOTA ? "acct_user" : "acct_group",
201                                 0444, &lprocfs_quota_seq_fops,
202                                 qqi->qqi_acct_obj);
203         if (rc) {
204                 CWARN("%s: can't add procfs entry for accounting file %d\n",
205                       qsd->qsd_svname, rc);
206                 GOTO(out, rc);
207         }
208
209         EXIT;
210 out:
211         if (rc)
212                 qsd_qtype_fini(env, qsd, qtype);
213         return rc;
214 }
215
216 /*
217  * Release a qsd_instance. Companion of qsd_init(). This releases all data
218  * structures associated with the quota slave.
219  * This function should be called when the OSD is shutting down.
220  *
221  * \param env - is the environment passed by the caller
222  * \param qsd - is the qsd instance to shutdown
223  */
224 void qsd_fini(const struct lu_env *env, struct qsd_instance *qsd)
225 {
226         int     qtype;
227         ENTRY;
228
229         CDEBUG(D_QUOTA, "%s: initiating QSD shutdown\n", qsd->qsd_svname);
230         qsd->qsd_stopping = true;
231
232         /* remove qsd proc entry */
233         if (qsd->qsd_proc != NULL && !IS_ERR(qsd->qsd_proc)) {
234                 lprocfs_remove(&qsd->qsd_proc);
235                 qsd->qsd_proc = NULL;
236         }
237
238         /* free per-quota type data */
239         for (qtype = USRQUOTA; qtype < MAXQUOTAS; qtype++)
240                 qsd_qtype_fini(env, qsd, qtype);
241
242         /* release quota root directory */
243         if (qsd->qsd_root != NULL && !IS_ERR(qsd->qsd_root)) {
244                 lu_object_put(env, &qsd->qsd_root->do_lu);
245                 qsd->qsd_root = NULL;
246         }
247
248         /* release reference on dt_device */
249         if (qsd->qsd_dev != NULL) {
250                 lu_ref_del(&qsd->qsd_dev->dd_lu_dev.ld_reference, "qsd", qsd);
251                 lu_device_put(&qsd->qsd_dev->dd_lu_dev);
252                 qsd->qsd_dev = NULL;
253         }
254
255         OBD_FREE_PTR(qsd);
256         EXIT;
257 }
258 EXPORT_SYMBOL(qsd_fini);
259
260 /*
261  * Create a new qsd_instance to be associated with backend osd device
262  * identified by \dev. For now, this function just create procfs files which
263  * dumps the accounting information
264  *
265  * \param env    - the environment passed by the caller
266  * \param svname - is the service name of the OSD device creating this instance
267  * \param dev    - is the dt_device where to store quota index files
268  * \param osd_proc - is the procfs parent directory where to create procfs file
269  *                   related to this new qsd instance
270  *
271  * \retval - pointer to new qsd_instance associated with dev \dev on success,
272  *           appropriate error on failure
273  */
274 struct qsd_instance *qsd_init(const struct lu_env *env, char *svname,
275                               struct dt_device *dev,
276                               cfs_proc_dir_entry_t *osd_proc)
277 {
278         struct qsd_instance     *qsd;
279         int                      rc, qtype;
280         ENTRY;
281
282         /* allocate qsd instance */
283         OBD_ALLOC_PTR(qsd);
284         if (qsd == NULL)
285                 RETURN(ERR_PTR(-ENOMEM));
286
287         cfs_rwlock_init(&qsd->qsd_lock);
288         /* copy service name */
289         strncpy(qsd->qsd_svname, svname, MAX_OBD_NAME);
290
291         /* grab reference on osd device */
292         lu_device_get(&dev->dd_lu_dev);
293         lu_ref_add(&dev->dd_lu_dev.ld_reference, "qsd", qsd);
294         qsd->qsd_dev = dev;
295
296         /* we only support pool ID 0 (default data or metadata pool) for the
297          * time being. A different pool ID could be assigned to this target via
298          * the configuration log in the future */
299         qsd->qsd_pool_id  = 0;
300
301         /* Record whether this qsd instance is managing quota enforcement for a
302          * MDT (i.e. inode quota) or OST (block quota) */
303         qsd->qsd_is_md = lu_device_is_md(dev->dd_lu_dev.ld_site->ls_top_dev);
304
305         /* look-up on-disk directory for the quota slave */
306         qsd->qsd_root = lquota_disk_dir_find_create(env, dev, NULL, QSD_DIR);
307         if (IS_ERR(qsd->qsd_root)) {
308                 rc = PTR_ERR(qsd->qsd_root);
309                 CERROR("%s: failed to create quota slave root dir (%d)\n",
310                        svname, rc);
311                 GOTO(out, rc);
312         }
313
314         /* register procfs directory */
315         qsd->qsd_proc = lprocfs_register(QSD_DIR, osd_proc,
316                                          lprocfs_quota_qsd_vars, qsd);
317         if (IS_ERR(qsd->qsd_proc)) {
318                 rc = PTR_ERR(qsd->qsd_proc);
319                 CERROR("%s: fail to create quota slave proc entry (%d)\n",
320                        svname, rc);
321                 GOTO(out, rc);
322         }
323
324         /* initialize per-quota type data */
325         for (qtype = USRQUOTA; qtype < MAXQUOTAS; qtype++) {
326                 rc = qsd_qtype_init(env, qsd, qtype);
327                 if (rc)
328                         GOTO(out, rc);
329         }
330 out:
331         if (rc) {
332                 qsd_fini(env, qsd);
333                 return ERR_PTR(rc);
334         }
335         RETURN(qsd);
336 }
337 EXPORT_SYMBOL(qsd_init);
338
339 /*
340  * Global initialization performed at module load time
341  */
342 int qsd_glb_init(void)
343 {
344         qsd_key_init_generic(&qsd_thread_key, NULL);
345         lu_context_key_register(&qsd_thread_key);
346         return 0;
347 }
348
349 /*
350  * Companion of qsd_glb_init() called at module unload time
351  */
352 void qsd_glb_fini(void)
353 {
354         lu_context_key_degister(&qsd_thread_key);
355 }