Whamcloud - gitweb
LU-16518 misc: use fixed hash code
[fs/lustre-release.git] / lustre / quota / qsd_lib.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 021110-1307, USA
20  *
21  * GPL HEADER END
22  */
23 /*
24  * Copyright (c) 2012, 2017, Intel Corporation.
25  * Use is subject to license terms.
26  *
27  * Author: Johann Lombardi <johann.lombardi@intel.com>
28  * Author: Niu    Yawei    <yawei.niu@intel.com>
29  */
30
31 /*
32  * Quota Slave Driver (QSD) management.
33  *
34  * The quota slave feature is implemented under the form of a library called
35  * QSD. Each OSD device should create a QSD instance via qsd_init() which will
36  * be used to manage quota enforcement for this device. This implies:
37  * - completing the reintegration procedure with the quota master (aka QMT, see
38  *   qmt_dev.c) to retrieve the latest quota settings and space distribution.
39  * - managing quota locks in order to be notified of configuration changes.
40  * - acquiring space from the QMT when quota space for a given user/group is
41  *   close to exhaustion.
42  * - allocating quota space to service threads for local request processing.
43  *
44  * Once the QSD instance created, the OSD device should invoke qsd_start()
45  * when recovery is completed. This notifies the QSD that we are about to
46  * process new requests on which quota should be strictly enforced.
47  * Then, qsd_op_begin/end can be used to reserve/release/pre-acquire quota space
48  * for/after each operation until shutdown where the QSD instance should be
49  * freed via qsd_fini().
50  */
51
52 #define DEBUG_SUBSYSTEM S_LQUOTA
53
54 #include <obd_class.h>
55 #include "qsd_internal.h"
56
57 struct kmem_cache *upd_kmem;
58
59 struct lu_kmem_descr qsd_caches[] = {
60         {
61                 .ckd_cache = &upd_kmem,
62                 .ckd_name  = "upd_kmem",
63                 .ckd_size  = sizeof(struct qsd_upd_rec)
64         },
65         {
66                 .ckd_cache = NULL
67         }
68 };
69
70 /* define qsd thread key */
71 LU_KEY_INIT_FINI(qsd, struct qsd_thread_info);
72 LU_CONTEXT_KEY_DEFINE(qsd, LCT_MD_THREAD | LCT_MG_THREAD | LCT_DT_THREAD | LCT_LOCAL);
73 LU_KEY_INIT_GENERIC(qsd);
74
75 /* some procfs helpers */
76 static int qsd_state_seq_show(struct seq_file *m, void *data)
77 {
78         struct qsd_instance     *qsd = m->private;
79         char                     enabled[5];
80
81         LASSERT(qsd != NULL);
82
83         memset(enabled, 0, sizeof(enabled));
84         if (qsd_type_enabled(qsd, USRQUOTA))
85                 strcat(enabled, "u");
86         if (qsd_type_enabled(qsd, GRPQUOTA))
87                 strcat(enabled, "g");
88         if (qsd_type_enabled(qsd, PRJQUOTA))
89                 strncat(enabled, "p", 1);
90         if (strlen(enabled) == 0)
91                 strcat(enabled, "none");
92
93         /* TODO: further pool ID should be removed or
94          * replaced with pool Name */
95         seq_printf(m, "target name:    %s\n"
96                    "pool ID:        %d\n"
97                    "type:           %s\n"
98                    "quota enabled:  %s\n"
99                    "conn to master: %s\n",
100                    qsd->qsd_svname, 0,
101                    qsd->qsd_is_md ? "md" : "dt", enabled,
102                    qsd->qsd_exp_valid ? "setup" : "not setup yet");
103
104         if (qsd->qsd_prepared) {
105                 memset(enabled, 0, sizeof(enabled));
106                 if (qsd->qsd_type_array[USRQUOTA]->qqi_acct_obj != NULL)
107                         strcat(enabled, "u");
108                 if (qsd->qsd_type_array[GRPQUOTA]->qqi_acct_obj != NULL)
109                         strcat(enabled, "g");
110                 if (qsd->qsd_type_array[PRJQUOTA]->qqi_acct_obj != NULL)
111                         strncat(enabled, "p", 1);
112                 if (strlen(enabled) == 0)
113                         strcat(enabled, "none");
114                 seq_printf(m, "space acct:     %s\n"
115                            "user uptodate:  glb[%d],slv[%d],reint[%d]\n"
116                            "group uptodate: glb[%d],slv[%d],reint[%d]\n"
117                            "project uptodate: glb[%d],slv[%d],reint[%d]\n",
118                            enabled,
119                            qsd->qsd_type_array[USRQUOTA]->qqi_glb_uptodate,
120                            qsd->qsd_type_array[USRQUOTA]->qqi_slv_uptodate,
121                            qsd->qsd_type_array[USRQUOTA]->qqi_reint,
122                            qsd->qsd_type_array[GRPQUOTA]->qqi_glb_uptodate,
123                            qsd->qsd_type_array[GRPQUOTA]->qqi_slv_uptodate,
124                            qsd->qsd_type_array[GRPQUOTA]->qqi_reint,
125                            qsd->qsd_type_array[PRJQUOTA]->qqi_glb_uptodate,
126                            qsd->qsd_type_array[PRJQUOTA]->qqi_slv_uptodate,
127                            qsd->qsd_type_array[PRJQUOTA]->qqi_reint);
128         }
129         return 0;
130 }
131 LPROC_SEQ_FOPS_RO(qsd_state);
132
133 static int qsd_enabled_seq_show(struct seq_file *m, void *data)
134 {
135         struct qsd_instance     *qsd = m->private;
136         char                     enabled[5];
137
138         LASSERT(qsd != NULL);
139
140         memset(enabled, 0, sizeof(enabled));
141         if (qsd_type_enabled(qsd, USRQUOTA))
142                 strncat(enabled, "u", sizeof(enabled) - strlen(enabled));
143         if (qsd_type_enabled(qsd, GRPQUOTA))
144                 strncat(enabled, "g", sizeof(enabled) - strlen(enabled));
145         if (qsd_type_enabled(qsd, PRJQUOTA))
146                 strncat(enabled, "p", sizeof(enabled) - strlen(enabled));
147         if (strlen(enabled) == 0)
148                 strncat(enabled, "none", sizeof(enabled) - strlen(enabled));
149
150         seq_printf(m, "%s\n", enabled);
151         return 0;
152 }
153
154 static ssize_t qsd_enabled_seq_write(struct file *file,
155                                      const char __user *buffer,
156                                      size_t count, loff_t *off)
157 {
158         struct seq_file *m = file->private_data;
159         struct qsd_instance *qsd = m->private;
160         char fsname[LUSTRE_MAXFSNAME + 1];
161         int enabled = 0;
162         char valstr[5];
163         int pool, rc;
164
165         if (count > 4)
166                 return -E2BIG;
167
168         if (copy_from_user(valstr, buffer, count))
169                 GOTO(out, count = -EFAULT);
170
171         valstr[sizeof(valstr) - 1] = 0;
172         if (strchr(valstr, 'u'))
173                 enabled |= BIT(USRQUOTA);
174         if (strchr(valstr, 'g'))
175                 enabled |= BIT(GRPQUOTA);
176         if (strchr(valstr, 'p'))
177                 enabled |= BIT(PRJQUOTA);
178
179         if (enabled == 0 && strcmp(valstr, "none"))
180                 GOTO(out, count = -EINVAL);
181
182         if (qsd->qsd_is_md)
183                 pool = LQUOTA_RES_MD;
184         else
185                 pool = LQUOTA_RES_DT;
186
187         if (server_name2fsname(qsd->qsd_svname, fsname, NULL))
188                 GOTO(out, count = -EINVAL);
189
190         rc = qsd_config(valstr, fsname, pool);
191         if (rc)
192                 count = rc;
193 out:
194         return count;
195 }
196 LPROC_SEQ_FOPS(qsd_enabled);
197
198 /* force reintegration procedure to be executed.
199  * Used for test/debugging purpose */
200 static ssize_t
201 lprocfs_force_reint_seq_write(struct file *file, const char __user *buffer,
202                                 size_t count, loff_t *off)
203 {
204         struct seq_file     *m = file->private_data;
205         struct qsd_instance *qsd = m->private;
206         int                  rc = 0, qtype;
207
208         LASSERT(qsd != NULL);
209
210         write_lock(&qsd->qsd_lock);
211         if (qsd->qsd_stopping) {
212                 /* don't mess up with shutdown procedure, it is already
213                  * complicated enough */
214                 rc = -ESHUTDOWN;
215         } else if (!qsd->qsd_prepared) {
216                 rc = -EAGAIN;
217         } else {
218                 /* mark all indexes as stale */
219                 for (qtype = USRQUOTA; qtype < LL_MAXQUOTAS; qtype++) {
220                         qsd->qsd_type_array[qtype]->qqi_glb_uptodate = false;
221                         qsd->qsd_type_array[qtype]->qqi_slv_uptodate = false;
222                 }
223         }
224         write_unlock(&qsd->qsd_lock);
225
226         if (rc)
227                 return rc;
228
229         /* kick off reintegration */
230         for (qtype = USRQUOTA; qtype < LL_MAXQUOTAS; qtype++) {
231                 rc = qsd_start_reint_thread(qsd->qsd_type_array[qtype]);
232                 if (rc)
233                         break;
234         }
235         return rc == 0 ? count : rc;
236 }
237 LPROC_SEQ_FOPS_WR_ONLY(qsd, force_reint);
238
239 static int qsd_timeout_seq_show(struct seq_file *m, void *data)
240 {
241         struct qsd_instance *qsd = m->private;
242         LASSERT(qsd != NULL);
243
244         seq_printf(m, "%d\n", qsd_wait_timeout(qsd));
245         return 0;
246 }
247
248 static ssize_t
249 qsd_timeout_seq_write(struct file *file, const char __user *buffer,
250                         size_t count, loff_t *off)
251 {
252         struct seq_file *m = file->private_data;
253         struct qsd_instance *qsd = m->private;
254         time64_t timeout;
255         int rc;
256
257         LASSERT(qsd != NULL);
258         rc = kstrtoll_from_user(buffer, count, 0, &timeout);
259         if (rc)
260                 return rc;
261
262         if (timeout < 0)
263                 return -EINVAL;
264
265         qsd->qsd_timeout = timeout;
266         return count;
267 }
268 LPROC_SEQ_FOPS(qsd_timeout);
269
270 static int qsd_root_prj_enable_seq_show(struct seq_file *m, void *data)
271 {
272         struct qsd_instance *qsd = m->private;
273
274         LASSERT(qsd != NULL);
275         seq_printf(m, "%d\n", qsd->qsd_root_prj_enable);
276         return 0;
277 }
278
279 static ssize_t
280 qsd_root_prj_enable_seq_write(struct file *file, const char __user *buffer,
281                         size_t count, loff_t *off)
282 {
283         struct seq_file *m = file->private_data;
284         struct qsd_instance *qsd = m->private;
285         bool enable;
286         int rc;
287
288         LASSERT(qsd != NULL);
289         rc = kstrtobool_from_user(buffer, count, &enable);
290         if (rc)
291                 return rc;
292
293         qsd->qsd_root_prj_enable = enable;
294         return count;
295 }
296 LPROC_SEQ_FOPS(qsd_root_prj_enable);
297
298 static struct lprocfs_vars lprocfs_quota_qsd_vars[] = {
299         { .name =       "info",
300           .fops =       &qsd_state_fops         },
301         { .name =       "enabled",
302           .fops =       &qsd_enabled_fops       },
303         { .name =       "force_reint",
304           .fops =       &qsd_force_reint_fops   },
305         { .name =       "timeout",
306           .fops =       &qsd_timeout_fops       },
307         { .name =       "root_prj_enable",
308           .fops =       &qsd_root_prj_enable_fops       },
309         { NULL }
310 };
311
312 /*
313  * Callback function invoked by the OSP layer when the connection to the master
314  * has been set up.
315  *
316  * \param data - is a pointer to the qsd_instance
317  *
318  * \retval - 0 on success, appropriate error on failure
319  */
320 static int qsd_conn_callback(void *data)
321 {
322         struct qsd_instance *qsd = (struct qsd_instance *)data;
323         int                  type;
324         ENTRY;
325
326         /* qsd_exp should now be valid */
327         LASSERT(qsd->qsd_exp);
328
329         qsd->qsd_ns = class_exp2obd(qsd->qsd_exp)->obd_namespace;
330
331         write_lock(&qsd->qsd_lock);
332         /* notify that qsd_exp is now valid */
333         qsd->qsd_exp_valid = true;
334         write_unlock(&qsd->qsd_lock);
335
336         /* Now that the connection to master is setup, we can initiate the
337          * reintegration procedure for quota types which are enabled.
338          * It is worth noting that, if the qsd_instance hasn't been started
339          * already, then we can only complete the first two steps of the
340          * reintegration procedure (i.e. global lock enqueue and slave
341          * index transfer) since the space usage reconciliation (i.e.
342          * step 3) will have to wait for qsd_start() to be called */
343         for (type = USRQUOTA; type < LL_MAXQUOTAS; type++) {
344                 struct qsd_qtype_info *qqi = qsd->qsd_type_array[type];
345                 struct task_struct *t;
346
347                 /* qqi_reint_task can be set to NULL at any time,
348                  * so we need to be careful.
349                  */
350                 rcu_read_lock();
351                 t = rcu_dereference(qqi->qqi_reint_task);
352                 if (t)
353                         wake_up_process(t);
354                 rcu_read_unlock();
355         }
356
357         RETURN(0);
358 }
359
360 /*
361  * Release qsd_qtype_info structure which contains data associated with a
362  * given quota type. This releases the accounting objects.
363  * It's called on OSD cleanup when the qsd instance is released.
364  *
365  * \param env - is the environment passed by the caller
366  * \param qsd - is the qsd instance managing the qsd_qtype_info structure
367  *              to be released
368  * \param qtype - is the quota type to be shutdown
369  */
370 static void qsd_qtype_fini(const struct lu_env *env, struct qsd_instance *qsd,
371                            int qtype)
372 {
373         struct qsd_qtype_info   *qqi;
374         int repeat = 0;
375         ENTRY;
376
377         if (qsd->qsd_type_array[qtype] == NULL)
378                 RETURN_EXIT;
379         qqi = qsd->qsd_type_array[qtype];
380         qsd->qsd_type_array[qtype] = NULL;
381
382         /* all deferred work lists should be empty */
383         LASSERT(list_empty(&qqi->qqi_deferred_glb));
384         LASSERT(list_empty(&qqi->qqi_deferred_slv));
385
386         /* shutdown lquota site */
387         if (qqi->qqi_site != NULL && !IS_ERR(qqi->qqi_site)) {
388                 lquota_site_free(env, qqi->qqi_site);
389                 qqi->qqi_site = NULL;
390         }
391
392         /* The qqi may still be holding by global locks which are being
393          * canceled asynchronously (LU-4365), see the following steps:
394          *
395          * - On server umount, we try to clear all quota locks first by
396          *   disconnecting LWP (which will invalidate import and cleanup
397          *   all locks on it), however, if quota reint process is holding
398          *   the global lock for reintegration at that time, global lock
399          *   will fail to be cleared on LWP disconnection.
400          *
401          * - Umount process goes on and stops reint process, the global
402          *   lock will be dropped on reint process exit, however, the lock
403          *   cancel in done in asynchronous way, so the
404          *   qsd_glb_blocking_ast() might haven't been called yet when we
405          *   get here.
406          */
407         while (atomic_read(&qqi->qqi_ref) > 1) {
408                 CDEBUG(D_QUOTA, "qqi reference count %u, repeat: %d\n",
409                        atomic_read(&qqi->qqi_ref), repeat);
410                 repeat++;
411                 schedule_timeout_interruptible(cfs_time_seconds(1));
412         }
413
414         /* by now, all qqi users should have gone away */
415         LASSERT(atomic_read(&qqi->qqi_ref) == 1);
416         lu_ref_fini(&qqi->qqi_reference);
417
418         /* release accounting object */
419         if (qqi->qqi_acct_obj != NULL && !IS_ERR(qqi->qqi_acct_obj)) {
420                 dt_object_put(env, qqi->qqi_acct_obj);
421                 qqi->qqi_acct_obj = NULL;
422         }
423
424         /* release slv index */
425         if (qqi->qqi_slv_obj != NULL && !IS_ERR(qqi->qqi_slv_obj)) {
426                 dt_object_put(env, qqi->qqi_slv_obj);
427                 qqi->qqi_slv_obj = NULL;
428                 qqi->qqi_slv_ver = 0;
429         }
430
431         /* release global index */
432         if (qqi->qqi_glb_obj != NULL && !IS_ERR(qqi->qqi_glb_obj)) {
433                 dt_object_put(env, qqi->qqi_glb_obj);
434                 qqi->qqi_glb_obj = NULL;
435                 qqi->qqi_glb_ver = 0;
436         }
437
438         OBD_FREE_PTR(qqi);
439         EXIT;
440 }
441
442 static const char *qtype2acct_name(int qtype)
443 {
444         static char unknown[24];
445
446         switch (qtype) {
447         case USRQUOTA:
448                 return "acct_user";
449         case GRPQUOTA:
450                 return "acct_group";
451         case PRJQUOTA:
452                 return "acct_project";
453         }
454
455         snprintf(unknown, sizeof(unknown), "acct_unknown_%u", qtype);
456         return unknown;
457 }
458
459 static const char *qtype2glb_name(int qtype)
460 {
461         static char unknown[24];
462
463         switch (qtype) {
464         case USRQUOTA:
465                 return "limit_user";
466         case GRPQUOTA:
467                 return "limit_group";
468         case PRJQUOTA:
469                 return "limit_project";
470         }
471
472         snprintf(unknown, sizeof(unknown), "acct_unknown_%u", qtype);
473         return unknown;
474 }
475
476 /*
477  * Allocate and initialize a qsd_qtype_info structure for quota type \qtype.
478  * This opens the accounting object and initializes the proc file.
479  * It's called on OSD start when the qsd_prepare() is invoked on the qsd
480  * instance.
481  *
482  * \param env  - the environment passed by the caller
483  * \param qsd  - is the qsd instance which will be in charge of the new
484  *               qsd_qtype_info instance.
485  * \param qtype - is quota type to set up
486  *
487  * \retval - 0 on success and qsd->qsd_type_array[qtype] is allocated,
488  *           appropriate error on failure
489  */
490 static int qsd_qtype_init(const struct lu_env *env, struct qsd_instance *qsd,
491                           int qtype)
492 {
493         struct qsd_qtype_info   *qqi;
494         int                      rc;
495         struct obd_uuid          uuid;
496         ENTRY;
497
498         LASSERT(qsd->qsd_type_array[qtype] == NULL);
499
500         /* allocate structure for this quota type */
501         OBD_ALLOC_PTR(qqi);
502         if (qqi == NULL)
503                 RETURN(-ENOMEM);
504         qsd->qsd_type_array[qtype] = qqi;
505         atomic_set(&qqi->qqi_ref, 1); /* referenced from qsd */
506
507         /* set backpointer and other parameters */
508         qqi->qqi_qsd   = qsd;
509         qqi->qqi_qtype = qtype;
510         lu_ref_init(&qqi->qqi_reference);
511         qqi->qqi_glb_uptodate = false;
512         qqi->qqi_slv_uptodate = false;
513         qqi->qqi_reint        = false;
514         INIT_LIST_HEAD(&qqi->qqi_deferred_glb);
515         INIT_LIST_HEAD(&qqi->qqi_deferred_slv);
516         lquota_generate_fid(&qqi->qqi_fid, QSD_RES_TYPE(qsd), qtype);
517
518         /* open accounting object */
519         LASSERT(qqi->qqi_acct_obj == NULL);
520         qqi->qqi_acct_obj = acct_obj_lookup(env, qsd->qsd_dev, qtype);
521         if (IS_ERR(qqi->qqi_acct_obj)) {
522                 CDEBUG(D_QUOTA, "%s: no %s space accounting support: rc = %ld\n",
523                        qsd->qsd_svname, qtype_name(qtype),
524                        PTR_ERR(qqi->qqi_acct_obj));
525                 qqi->qqi_acct_obj = NULL;
526                 qqi->qqi_acct_failed = true;
527         }
528
529         /* open global index copy */
530         LASSERT(qqi->qqi_glb_obj == NULL);
531         qqi->qqi_glb_obj = lquota_disk_glb_find_create(env, qsd->qsd_dev,
532                                                        qsd->qsd_root,
533                                                        &qqi->qqi_fid, true);
534         if (IS_ERR(qqi->qqi_glb_obj)) {
535                 CERROR("%s: can't open global index copy "DFID" %ld\n",
536                        qsd->qsd_svname, PFID(&qqi->qqi_fid),
537                        PTR_ERR(qqi->qqi_glb_obj));
538                 GOTO(out, rc = PTR_ERR(qqi->qqi_glb_obj));
539         }
540         qqi->qqi_glb_ver = dt_version_get(env, qqi->qqi_glb_obj);
541
542         /* open slave index copy */
543         LASSERT(qqi->qqi_slv_obj == NULL);
544         obd_str2uuid(&uuid, qsd->qsd_svname);
545         qqi->qqi_slv_obj = lquota_disk_slv_find_create(env, qsd->qsd_dev,
546                                                        qsd->qsd_root,
547                                                        &qqi->qqi_fid, &uuid,
548                                                        true);
549         if (IS_ERR(qqi->qqi_slv_obj)) {
550                 CERROR("%s: can't open slave index copy "DFID" %ld\n",
551                        qsd->qsd_svname, PFID(&qqi->qqi_fid),
552                        PTR_ERR(qqi->qqi_slv_obj));
553                 GOTO(out, rc = PTR_ERR(qqi->qqi_slv_obj));
554         }
555         qqi->qqi_slv_ver = dt_version_get(env, qqi->qqi_slv_obj);
556
557         /* allocate site */
558         qqi->qqi_site = lquota_site_alloc(env, qqi, false, qtype, &qsd_lqe_ops);
559         if (IS_ERR(qqi->qqi_site)) {
560                 CERROR("%s: can't allocate site "DFID" %ld\n", qsd->qsd_svname,
561                        PFID(&qqi->qqi_fid), PTR_ERR(qqi->qqi_site));
562                 GOTO(out, rc = PTR_ERR(qqi->qqi_site));
563         }
564
565         /* register proc entry for accounting & global index copy objects */
566         rc = lprocfs_seq_create(qsd->qsd_proc, qtype2acct_name(qtype),
567                                 0444, &lprocfs_quota_seq_fops,
568                                 qqi->qqi_acct_obj);
569         if (rc) {
570                 CERROR("%s: can't add procfs entry for accounting file %d\n",
571                        qsd->qsd_svname, rc);
572                 GOTO(out, rc);
573         }
574
575         rc = lprocfs_seq_create(qsd->qsd_proc, qtype2glb_name(qtype),
576                                 0444, &lprocfs_quota_seq_fops,
577                                 qqi->qqi_glb_obj);
578         if (rc) {
579                 CERROR("%s: can't add procfs entry for global index copy %d\n",
580                        qsd->qsd_svname, rc);
581                 GOTO(out, rc);
582         }
583         EXIT;
584 out:
585         if (rc)
586                 qsd_qtype_fini(env, qsd, qtype);
587         return rc;
588 }
589
590 /*
591  * Release a qsd_instance. Companion of qsd_init(). This releases all data
592  * structures associated with the quota slave (on-disk objects, lquota entry
593  * tables, ...).
594  * This function should be called when the OSD is shutting down.
595  *
596  * \param env - is the environment passed by the caller
597  * \param qsd - is the qsd instance to shutdown
598  */
599 void qsd_fini(const struct lu_env *env, struct qsd_instance *qsd)
600 {
601         int     qtype;
602         ENTRY;
603
604         if (unlikely(qsd == NULL))
605                 RETURN_EXIT;
606
607         CDEBUG(D_QUOTA, "%s: initiating QSD shutdown\n", qsd->qsd_svname);
608         write_lock(&qsd->qsd_lock);
609         qsd->qsd_stopping = true;
610         write_unlock(&qsd->qsd_lock);
611
612         /* remove qsd proc entry */
613         if (qsd->qsd_proc != NULL) {
614                 lprocfs_remove(&qsd->qsd_proc);
615                 qsd->qsd_proc = NULL;
616         }
617
618         /* stop the writeback thread */
619         qsd_stop_upd_thread(qsd);
620
621         /* shutdown the reintegration threads */
622         for (qtype = USRQUOTA; qtype < LL_MAXQUOTAS; qtype++) {
623                 if (qsd->qsd_type_array[qtype] == NULL)
624                         continue;
625                 qsd_stop_reint_thread(qsd->qsd_type_array[qtype]);
626         }
627
628         if (qsd->qsd_ns != NULL) {
629                 qsd->qsd_ns = NULL;
630         }
631
632         /* release per-filesystem information */
633         if (qsd->qsd_fsinfo != NULL) {
634                 mutex_lock(&qsd->qsd_fsinfo->qfs_mutex);
635                 /* remove from the list of fsinfo */
636                 list_del_init(&qsd->qsd_link);
637                 mutex_unlock(&qsd->qsd_fsinfo->qfs_mutex);
638                 qsd_put_fsinfo(qsd->qsd_fsinfo);
639                 qsd->qsd_fsinfo = NULL;
640         }
641
642         /* free per-quota type data */
643         for (qtype = USRQUOTA; qtype < LL_MAXQUOTAS; qtype++)
644                 qsd_qtype_fini(env, qsd, qtype);
645
646         /* deregister connection to the quota master */
647         qsd->qsd_exp_valid = false;
648         lustre_deregister_lwp_item(&qsd->qsd_exp);
649
650         /* release quota root directory */
651         if (qsd->qsd_root != NULL) {
652                 dt_object_put(env, qsd->qsd_root);
653                 qsd->qsd_root = NULL;
654         }
655
656         /* release reference on dt_device */
657         if (qsd->qsd_dev != NULL) {
658                 lu_ref_del(&qsd->qsd_dev->dd_lu_dev.ld_reference, "qsd", qsd);
659                 lu_device_put(&qsd->qsd_dev->dd_lu_dev);
660                 qsd->qsd_dev = NULL;
661         }
662
663         CDEBUG(D_QUOTA, "%s: QSD shutdown completed\n", qsd->qsd_svname);
664         OBD_FREE_PTR(qsd);
665         EXIT;
666 }
667 EXPORT_SYMBOL(qsd_fini);
668
669 /*
670  * Create a new qsd_instance to be associated with backend osd device
671  * identified by \dev.
672  *
673  * \param env    - the environment passed by the caller
674  * \param svname - is the service name of the OSD device creating this instance
675  * \param dev    - is the dt_device where to store quota index files
676  * \param osd_proc - is the procfs parent directory where to create procfs file
677  *                   related to this new qsd instance
678  *
679  * \retval - pointer to new qsd_instance associated with dev \dev on success,
680  *           appropriate error on failure
681  */
682 struct qsd_instance *qsd_init(const struct lu_env *env, char *svname,
683                               struct dt_device *dev,
684                               struct proc_dir_entry *osd_proc,
685                               bool is_md, bool excl)
686 {
687         struct qsd_thread_info  *qti = qsd_info(env);
688         struct qsd_instance     *qsd;
689         int                      rc, type, idx;
690         ENTRY;
691
692         /* only configure qsd for MDT & OST */
693         type = server_name2index(svname, &idx, NULL);
694         if (type != LDD_F_SV_TYPE_MDT && type != LDD_F_SV_TYPE_OST)
695                 RETURN(NULL);
696
697         /* allocate qsd instance */
698         OBD_ALLOC_PTR(qsd);
699         if (qsd == NULL)
700                 RETURN(ERR_PTR(-ENOMEM));
701
702         /* generic initializations */
703         rwlock_init(&qsd->qsd_lock);
704         INIT_LIST_HEAD(&qsd->qsd_link);
705         INIT_LIST_HEAD(&qsd->qsd_upd_list);
706         spin_lock_init(&qsd->qsd_adjust_lock);
707         INIT_LIST_HEAD(&qsd->qsd_adjust_list);
708         qsd->qsd_prepared = false;
709         qsd->qsd_started = false;
710         qsd->qsd_is_md = is_md;
711         qsd->qsd_updating = false;
712         qsd->qsd_exclusive = excl;
713
714         /* copy service name */
715         if (strlcpy(qsd->qsd_svname, svname, sizeof(qsd->qsd_svname))
716             >= sizeof(qsd->qsd_svname))
717                 GOTO(out, rc = -E2BIG);
718
719         /* grab reference on osd device */
720         lu_device_get(&dev->dd_lu_dev);
721         lu_ref_add(&dev->dd_lu_dev.ld_reference, "qsd", qsd);
722         qsd->qsd_dev = dev;
723
724         /* get fsname from svname */
725         rc = server_name2fsname(svname, qti->qti_buf, NULL);
726         if (rc) {
727                 CERROR("%s: fail to extract filesystem name\n", svname);
728                 GOTO(out, rc);
729         }
730
731         /* look up quota setting for the filesystem the target belongs to */
732         qsd->qsd_fsinfo = qsd_get_fsinfo(qti->qti_buf, 1);
733         if (qsd->qsd_fsinfo == NULL) {
734                 CERROR("%s: failed to locate filesystem information\n", svname);
735                 GOTO(out, rc = -EINVAL);
736         }
737
738         /* add in the list of lquota_fsinfo */
739         mutex_lock(&qsd->qsd_fsinfo->qfs_mutex);
740         list_add_tail(&qsd->qsd_link, &qsd->qsd_fsinfo->qfs_qsd_list);
741         mutex_unlock(&qsd->qsd_fsinfo->qfs_mutex);
742
743         /* register procfs directory */
744         if (qsd->qsd_is_md)
745                 qsd->qsd_proc = lprocfs_register(QSD_DIR_MD, osd_proc,
746                                                  lprocfs_quota_qsd_vars, qsd);
747         else
748                 qsd->qsd_proc = lprocfs_register(QSD_DIR_DT, osd_proc,
749                                                  lprocfs_quota_qsd_vars, qsd);
750
751         if (type == LDD_F_SV_TYPE_MDT && qsd->qsd_is_md)
752                 lprocfs_add_symlink(QSD_DIR, osd_proc, "./%s", QSD_DIR_MD);
753         else if (type == LDD_F_SV_TYPE_OST && !qsd->qsd_is_md)
754                 lprocfs_add_symlink(QSD_DIR, osd_proc, "./%s", QSD_DIR_DT);
755
756         if (IS_ERR(qsd->qsd_proc)) {
757                 rc = PTR_ERR(qsd->qsd_proc);
758                 qsd->qsd_proc = NULL;
759                 CERROR("%s: fail to create quota slave proc entry (%d)\n",
760                        svname, rc);
761                 GOTO(out, rc);
762         }
763         EXIT;
764 out:
765         if (rc) {
766                 qsd_fini(env, qsd);
767                 return ERR_PTR(rc);
768         }
769         RETURN(qsd);
770 }
771 EXPORT_SYMBOL(qsd_init);
772
773 /*
774  * Initialize on-disk structures in order to manage quota enforcement for
775  * the target associated with the qsd instance \qsd and starts the reintegration
776  * procedure for each quota type as soon as possible.
777  * The last step of the reintegration will be completed once qsd_start() is
778  * called, at which points the space reconciliation with the master will be
779  * executed.
780  * This function must be called when the server stack is fully configured,
781  * typically when ->ldo_prepare is called across the stack.
782  *
783  * \param env - the environment passed by the caller
784  * \param qsd - is qsd_instance to prepare
785  *
786  * \retval - 0 on success, appropriate error on failure
787  */
788 int qsd_prepare(const struct lu_env *env, struct qsd_instance *qsd)
789 {
790         struct qsd_thread_info  *qti = qsd_info(env);
791         int                      qtype, rc = 0;
792         ENTRY;
793
794         if (unlikely(qsd == NULL))
795                 RETURN(0);
796
797         read_lock(&qsd->qsd_lock);
798         if (qsd->qsd_prepared) {
799                 CERROR("%s: qsd instance already prepared\n", qsd->qsd_svname);
800                 rc = -EALREADY;
801         }
802         read_unlock(&qsd->qsd_lock);
803         if (rc)
804                 RETURN(rc);
805
806         /* Record whether this qsd instance is managing quota enforcement for a
807          * MDT (i.e. inode quota) or OST (block quota) */
808         if (qsd->qsd_is_md)
809                 qsd->qsd_sync_threshold = LQUOTA_LEAST_QUNIT(LQUOTA_RES_MD);
810         else
811                 qsd->qsd_sync_threshold = LQUOTA_LEAST_QUNIT(LQUOTA_RES_DT);
812
813         /* look-up on-disk directory for the quota slave */
814         qsd->qsd_root = lquota_disk_dir_find_create(env, qsd->qsd_dev, NULL,
815                                                     QSD_DIR);
816         if (IS_ERR(qsd->qsd_root)) {
817                 rc = PTR_ERR(qsd->qsd_root);
818                 qsd->qsd_root = NULL;
819                 CERROR("%s: failed to create quota slave root dir (%d)\n",
820                        qsd->qsd_svname, rc);
821                 RETURN(rc);
822         }
823
824         /* initialize per-quota type data */
825         for (qtype = USRQUOTA; qtype < LL_MAXQUOTAS; qtype++) {
826                 rc = qsd_qtype_init(env, qsd, qtype);
827                 if (rc)
828                         RETURN(rc);
829         }
830
831         /* pools successfully setup, mark the qsd as prepared */
832         write_lock(&qsd->qsd_lock);
833         qsd->qsd_prepared = true;
834         write_unlock(&qsd->qsd_lock);
835
836         if (qsd->qsd_dev->dd_rdonly)
837                 RETURN(0);
838
839         /* start reintegration thread for each type, if required */
840         for (qtype = USRQUOTA; qtype < LL_MAXQUOTAS; qtype++) {
841                 struct qsd_qtype_info   *qqi = qsd->qsd_type_array[qtype];
842
843                 if (qsd_type_enabled(qsd, qtype) &&
844                     qqi->qqi_acct_failed) {
845                         LCONSOLE_ERROR("%s: can't enable quota enforcement "
846                                        "since space accounting isn't functional"
847                                        ". Please run tunefs.lustre --quota on "
848                                        "an unmounted filesystem if not done "
849                                        "already\n", qsd->qsd_svname);
850                         continue;
851                 }
852
853                 rc = qsd_start_reint_thread(qqi);
854                 if (rc) {
855                         CERROR("%s: failed to start reint thread for type %s: rc = %d\n",
856                                 qsd->qsd_svname, qtype_name(qtype), rc);
857                         RETURN(rc);
858                 }
859         }
860
861         /* start writeback thread */
862         rc = qsd_start_upd_thread(qsd);
863         if (rc) {
864                 CERROR("%s: failed to start writeback thread (%d)\n",
865                        qsd->qsd_svname, rc);
866                 RETURN(rc);
867         }
868
869         /* generate osp name */
870         rc = tgt_name2lwp_name(qsd->qsd_svname, qti->qti_buf,
871                                MTI_NAME_MAXLEN, 0);
872         if (rc) {
873                 CERROR("%s: failed to generate ospname (%d)\n",
874                        qsd->qsd_svname, rc);
875                 RETURN(rc);
876         }
877
878         /* the connection callback will start the reintegration
879          * procedure if quota is enabled */
880         rc = lustre_register_lwp_item(qti->qti_buf, &qsd->qsd_exp,
881                                       qsd_conn_callback, (void *)qsd);
882         if (rc) {
883                 CERROR("%s: fail to get connection to master (%d)\n",
884                        qsd->qsd_svname, rc);
885                 RETURN(rc);
886         }
887
888         RETURN(0);
889 }
890 EXPORT_SYMBOL(qsd_prepare);
891
892 /*
893  * Start a qsd instance. This will complete the last step of the reintegration
894  * procedure as soon as possible (provided that the master is reachable).
895  * This should be called when recovery has been completed and quota should now
896  * be enforced on every operations.
897  *
898  * \param env - the environment passed by the caller
899  * \param qsd - is the qsd instance associated with the osd device to start
900  */
901 int qsd_start(const struct lu_env *env, struct qsd_instance *qsd)
902 {
903         int     type, rc = 0;
904         ENTRY;
905
906         if (unlikely(qsd == NULL))
907                 RETURN(0);
908
909         write_lock(&qsd->qsd_lock);
910         if (!qsd->qsd_prepared) {
911                 CERROR("%s: can't start qsd instance since it wasn't properly "
912                        "initialized\n", qsd->qsd_svname);
913                 rc = -EFAULT;
914         } else if (qsd->qsd_started) {
915                 CERROR("%s: qsd instance already started\n", qsd->qsd_svname);
916                 rc = -EALREADY;
917         } else {
918                 /* notify that the qsd_instance is now started */
919                 qsd->qsd_started = true;
920         }
921         write_unlock(&qsd->qsd_lock);
922
923         if (rc)
924                 RETURN(rc);
925
926         /* Trigger the 3rd step of reintegration: If usage > granted, acquire
927          * up to usage; If usage < granted, release down to usage.  */
928         for (type = USRQUOTA; type < LL_MAXQUOTAS; type++) {
929                 struct qsd_qtype_info   *qqi = qsd->qsd_type_array[type];
930                 struct task_struct *t;
931
932                 /* qqi_reint_task can be set to NULL at any time,
933                  * so we need to be careful.
934                  */
935                 rcu_read_lock();
936                 t = rcu_dereference(qqi->qqi_reint_task);
937                 if (t)
938                         wake_up_process(t);
939                 rcu_read_unlock();
940         }
941
942         RETURN(rc);
943 }
944 EXPORT_SYMBOL(qsd_start);
945
946 void lustre_register_quota_process_config(int (*qpc)(struct lustre_cfg *lcfg));
947
948 /*
949  * Global initialization performed at module load time
950  */
951 int qsd_glb_init(void)
952 {
953         int     rc;
954
955         rc = lu_kmem_init(qsd_caches);
956         if (rc)
957                 return rc;
958
959         qsd_key_init_generic(&qsd_thread_key, NULL);
960         lu_context_key_register(&qsd_thread_key);
961         lustre_register_quota_process_config(qsd_process_config);
962
963         return 0;
964 }
965
966 /*
967  * Companion of qsd_glb_init() called at module unload time
968  */
969 void qsd_glb_fini(void)
970 {
971         lustre_register_quota_process_config(NULL);
972         lu_kmem_fini(qsd_caches);
973         lu_context_key_degister(&qsd_thread_key);
974 }