Whamcloud - gitweb
LU-17705 ptlrpc: replace synchronize_rcu() with rcu_barrier()
[fs/lustre-release.git] / lustre / quota / qmt_dev.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 021110-1307, USA
20  *
21  * GPL HEADER END
22  */
23 /*
24  * Copyright (c) 2012, 2017, Intel Corporation.
25  * Use is subject to license terms.
26  *
27  * Author: Johann Lombardi <johann.lombardi@intel.com>
28  * Author: Niu    Yawei    <yawei.niu@intel.com>
29  */
30
31 /*
32  * Management of the device associated with a Quota Master Target (QMT).
33  *
34  * The QMT holds the cluster wide quota limits. It stores the quota settings
35  * ({hard,soft} limit & grace time) in a global index file and is in charge
36  * of allocating quota space to slaves while guaranteeing that the overall
37  * limits aren't exceeded. The QMT also maintains one index per slave (in fact,
38  * one per slave per quota type) used to track how much space is allocated
39  * to a given slave. Now that the QMT is aware of the quota space distribution
40  * among slaves, it can afford to rebalance efficiently quota space from one
41  * slave to another. Slaves are asked to release quota space via glimpse
42  * callbacks sent on DLM locks which are granted to slaves when those latters
43  * acquire quota space.
44  *
45  * The QMT device is currently set up by the MDT and should probably be moved
46  * to a separate target in the future. Meanwhile, the MDT forwards all quota
47  * requests to the QMT via a list of request handlers (see struct qmt_handlers
48  * in lustre_quota.h). The QMT also borrows the LDLM namespace from the MDT.
49  *
50  * To bring up a QMT device, the following steps must be completed:
51  *
52  * - call ->ldto_device_alloc to allocate the QMT device and perform basic
53  *   initialization like connecting to the backend OSD device or setting up the
54  *   default pools and the QMT procfs directory.
55  *
56  * - the MDT can then connect to the QMT instance via legacy obd_connect path.
57  *
58  * - once the MDT stack has been fully configured, ->ldto_prepare must be called
59  *   to configure on-disk objects associated with this master target.
60  *
61  * To shutdown a QMT device, the MDT just has to disconnect from the QMT.
62  *
63  * The qmt_device_type structure is registered when the lquota module is
64  * loaded and all the steps described above are automatically done when the MDT
65  * set up the Quota Master Target via calls to class_attach/class_setup, see
66  * mdt_quota_init() for more details.
67  */
68
69 #define DEBUG_SUBSYSTEM S_LQUOTA
70
71 #include <obd_class.h>
72 #include <lprocfs_status.h>
73 #include <lustre_disk.h>
74 #include "qmt_internal.h"
75
76 static const struct lu_device_operations qmt_lu_ops;
77
78 /*
79  * Release quota master target and all data structure associated with this
80  * target.
81  * Called on MDT0 cleanup.
82  *
83  * \param env - is the environment passed by the caller
84  * \param ld  - is the lu_device associated with the qmt device to be released
85  *
86  * \retval - NULL on success (backend OSD device is managed by the main stack),
87  *           appropriate error on failure
88  */
89 static struct lu_device *qmt_device_fini(const struct lu_env *env,
90                                          struct lu_device *ld)
91 {
92         struct qmt_device       *qmt = lu2qmt_dev(ld);
93         ENTRY;
94
95         LASSERT(qmt != NULL);
96
97         CDEBUG(D_QUOTA, "%s: initiating QMT shutdown\n", qmt->qmt_svname);
98         qmt->qmt_stopping = true;
99
100         if (qmt_lvbo_free_wq) {
101                 destroy_workqueue(qmt_lvbo_free_wq);
102                 qmt_lvbo_free_wq = NULL;
103         }
104
105         /* kill pool instances, if any */
106         qmt_pool_fini(env, qmt);
107
108         /* remove qmt proc entry */
109         if (qmt->qmt_proc != NULL && !IS_ERR(qmt->qmt_proc)) {
110                 lprocfs_remove(&qmt->qmt_proc);
111                 qmt->qmt_proc = NULL;
112         }
113
114         /* stop rebalance thread */
115         if (!qmt->qmt_child->dd_rdonly)
116                 qmt_stop_reba_thread(qmt);
117
118         if (qmt->qmt_root) {
119                 dt_object_put(env, qmt->qmt_root);
120                 qmt->qmt_root = NULL;
121         }
122
123         /* disconnect from OSD */
124         if (qmt->qmt_child_exp != NULL) {
125                 obd_disconnect(qmt->qmt_child_exp);
126                 qmt->qmt_child_exp = NULL;
127                 qmt->qmt_child = NULL;
128         }
129
130         /* clear references to MDT namespace */
131         ld->ld_obd->obd_namespace = NULL;
132         qmt->qmt_ns = NULL;
133
134         RETURN(NULL);
135 }
136
137 /*
138  * Connect a quota master to the backend OSD device.
139  *
140  * \param env - is the environment passed by the caller
141  * \param qmt - is the quota master target to be connected
142  * \param cfg - is the configuration log record from which we need to extract
143  *              the service name of the backend OSD device to connect to.
144  *
145  * \retval - 0 on success, appropriate error on failure
146  */
147 static int qmt_connect_to_osd(const struct lu_env *env, struct qmt_device *qmt,
148                               struct lustre_cfg *cfg)
149 {
150         struct obd_connect_data *data = NULL;
151         struct obd_device       *obd;
152         struct lu_device        *ld = qmt2lu_dev(qmt);
153         int                      rc;
154         ENTRY;
155
156         LASSERT(qmt->qmt_child_exp == NULL);
157
158         OBD_ALLOC_PTR(data);
159         if (data == NULL)
160                 GOTO(out, rc = -ENOMEM);
161
162         /* look-up OBD device associated with the backend OSD device.
163          * The MDT is kind enough to pass the OBD name in QMT configuration */
164         obd = class_name2obd(lustre_cfg_string(cfg, 3));
165         if (obd == NULL) {
166                 CERROR("%s: can't locate backend osd device: %s\n",
167                        qmt->qmt_svname, lustre_cfg_string(cfg, 3));
168                 GOTO(out, rc = -ENOTCONN);
169         }
170
171         data->ocd_connect_flags = OBD_CONNECT_VERSION;
172         data->ocd_version = LUSTRE_VERSION_CODE;
173
174         /* connect to OSD device */
175         rc = obd_connect(NULL, &qmt->qmt_child_exp, obd, &obd->obd_uuid, data,
176                          NULL);
177         if (rc) {
178                 CERROR("%s: cannot connect to osd dev %s (%d)\n",
179                        qmt->qmt_svname, obd->obd_name, rc);
180                 GOTO(out, rc);
181         }
182
183         /* initialize site (although it isn't used anywhere) and lu_device
184          * pointer to next device */
185         qmt->qmt_child = lu2dt_dev(qmt->qmt_child_exp->exp_obd->obd_lu_dev);
186         ld->ld_site = qmt->qmt_child_exp->exp_obd->obd_lu_dev->ld_site;
187         EXIT;
188 out:
189         if (data)
190                 OBD_FREE_PTR(data);
191         return rc;
192 }
193
194 /*
195  * Initialize quota master target device. This includers connecting to
196  * the backend OSD device, initializing the pool configuration and creating the
197  * root procfs directory dedicated to this quota target.
198  * The rest of the initialization is done when the stack is fully configured
199  * (i.e. when ->ldo_start is called across the stack).
200  *
201  * This function is called on MDT0 setup.
202  *
203  * \param env - is the environment passed by the caller
204  * \param qmt - is the quota master target to be initialized
205  * \param ldt - is the device type structure associated with the qmt device
206  * \param cfg - is the configuration record used to configure the qmt device
207  *
208  * \retval - 0 on success, appropriate error on failure
209  */
210 static int qmt_device_init0(const struct lu_env *env, struct qmt_device *qmt,
211                             struct lu_device_type *ldt, struct lustre_cfg *cfg)
212 {
213         struct lu_device        *ld = qmt2lu_dev(qmt);
214         struct obd_device       *obd, *mdt_obd;
215         struct obd_type         *type;
216         char                    *svname = lustre_cfg_string(cfg, 0);
217         int                      rc;
218         ENTRY;
219
220         if (svname == NULL)
221                 RETURN(-EINVAL);
222
223         /* record who i am, it might be useful ... */
224         rc = strscpy(qmt->qmt_svname, svname, sizeof(qmt->qmt_svname));
225         if (rc < 0)
226                 RETURN(rc);
227
228         /* look-up the obd_device associated with the qmt */
229         obd = class_name2obd(qmt->qmt_svname);
230         if (obd == NULL)
231                 RETURN(-ENOENT);
232
233         /* reference each other */
234         obd->obd_lu_dev = ld;
235         ld->ld_obd      = obd;
236
237         /* look-up the parent MDT to steal its ldlm namespace ... */
238         mdt_obd = class_name2obd(lustre_cfg_string(cfg, 2));
239         if (mdt_obd == NULL)
240                 RETURN(-ENOENT);
241
242         /* borrow  MDT namespace. kind of a hack until we have our own namespace
243          * & service threads */
244         LASSERT(mdt_obd->obd_namespace != NULL);
245         obd->obd_namespace = mdt_obd->obd_namespace;
246         qmt->qmt_ns = obd->obd_namespace;
247
248         /* connect to backend osd device */
249         rc = qmt_connect_to_osd(env, qmt, cfg);
250         if (rc)
251                 GOTO(out, rc);
252
253         /* set up and start rebalance thread */
254         INIT_LIST_HEAD(&qmt->qmt_reba_list);
255         spin_lock_init(&qmt->qmt_reba_lock);
256         if (!qmt->qmt_child->dd_rdonly) {
257                 rc = qmt_start_reba_thread(qmt);
258                 if (rc) {
259                         CERROR("%s: failed to start rebalance thread (%d)\n",
260                                qmt->qmt_svname, rc);
261                         GOTO(out, rc);
262                 }
263         }
264
265         /* at the moment there is no linkage between lu_type and obd_type, so
266          * we lookup obd_type this way */
267         type = class_search_type(LUSTRE_QMT_NAME);
268         LASSERT(type != NULL);
269
270         /* put reference taken by class_search_type */
271         kobject_put(&type->typ_kobj);
272
273         /* register proc directory associated with this qmt */
274         qmt->qmt_proc = lprocfs_register(qmt->qmt_svname, type->typ_procroot,
275                                          NULL, NULL);
276         if (IS_ERR(qmt->qmt_proc)) {
277                 rc = PTR_ERR(qmt->qmt_proc);
278                 CERROR("%s: failed to create qmt proc entry (%d)\n",
279                        qmt->qmt_svname, rc);
280                 GOTO(out, rc);
281         }
282
283         /* initialize pool configuration */
284         rc = qmt_pool_init(env, qmt);
285         if (rc)
286                 GOTO(out, rc);
287
288         qmt_lvbo_free_wq = alloc_workqueue("qmt_lvbo_free", WQ_UNBOUND, 0);
289         if (!qmt_lvbo_free_wq) {
290                 rc = -ENOMEM;
291                 CERROR("%s: failed to start qmt_lvbo_free workqueue: rc = %d\n",
292                        qmt->qmt_svname, rc);
293                 goto out;
294         }
295
296         EXIT;
297 out:
298         if (rc)
299                 qmt_device_fini(env, ld);
300         return rc;
301 }
302
303 /*
304  * Free quota master target device. Companion of qmt_device_alloc()
305  *
306  * \param env - is the environment passed by the caller
307  * \param ld  - is the lu_device associated with the qmt dev to be freed
308  *
309  * \retval - NULL on success (backend OSD device is managed by the main stack),
310  *           appropriate error on failure
311  */
312 static struct lu_device *qmt_device_free(const struct lu_env *env,
313                                          struct lu_device *ld)
314 {
315         struct qmt_device       *qmt = lu2qmt_dev(ld);
316         ENTRY;
317
318         LASSERT(qmt != NULL);
319
320         lu_device_fini(ld);
321         OBD_FREE_PTR(qmt);
322         RETURN(NULL);
323 }
324
325 /*
326  * Allocate quota master target and initialize it.
327  *
328  * \param env - is the environment passed by the caller
329  * \param ldt - is the device type structure associated with the qmt
330  * \param cfg - is the configuration record used to configure the qmt
331  *
332  * \retval - lu_device structure associated with the qmt on success,
333  *           appropriate error on failure
334  */
335 static struct lu_device *qmt_device_alloc(const struct lu_env *env,
336                                           struct lu_device_type *ldt,
337                                           struct lustre_cfg *cfg)
338 {
339         struct qmt_device       *qmt;
340         struct lu_device        *ld;
341         int                      rc;
342         ENTRY;
343
344         /* allocate qmt device */
345         OBD_ALLOC_PTR(qmt);
346         if (qmt == NULL)
347                 RETURN(ERR_PTR(-ENOMEM));
348
349         /* configure lu/dt_device */
350         ld = qmt2lu_dev(qmt);
351         dt_device_init(&qmt->qmt_dt_dev, ldt);
352         ld->ld_ops = &qmt_lu_ops;
353
354         /* initialize qmt device */
355         rc = qmt_device_init0(env, qmt, ldt, cfg);
356         if (rc != 0) {
357                 qmt_device_free(env, ld);
358                 RETURN(ERR_PTR(rc));
359         }
360
361         RETURN(ld);
362 }
363
364 LU_KEY_INIT_FINI(qmt, struct qmt_thread_info);
365 LU_TYPE_INIT_FINI(qmt, &qmt_thread_key);
366 LU_CONTEXT_KEY_DEFINE(qmt, LCT_MD_THREAD);
367
368 /*
369  * lu device type operations associated with the master target.
370  */
371 static const struct lu_device_type_operations qmt_device_type_ops = {
372         .ldto_init              = qmt_type_init,
373         .ldto_fini              = qmt_type_fini,
374
375         .ldto_start             = qmt_type_start,
376         .ldto_stop              = qmt_type_stop,
377
378         .ldto_device_alloc      = qmt_device_alloc,
379         .ldto_device_free       = qmt_device_free,
380
381         .ldto_device_fini       = qmt_device_fini,
382 };
383
384 /*
385  * lu device type structure associated with the master target.
386  * MDT0 uses this structure to configure the qmt.
387  */
388 static struct lu_device_type qmt_device_type = {
389         .ldt_tags       = LU_DEVICE_DT,
390         .ldt_name       = LUSTRE_QMT_NAME,
391         .ldt_ops        = &qmt_device_type_ops,
392         .ldt_ctx_tags   = LCT_MD_THREAD,
393 };
394
395 /*
396  * obd_connect handler used by the MDT to connect to the master target.
397  */
398 static int qmt_device_obd_connect(const struct lu_env *env,
399                                   struct obd_export **exp,
400                                   struct obd_device *obd,
401                                   struct obd_uuid *cluuid,
402                                   struct obd_connect_data *data,
403                                   void *localdata)
404 {
405         struct lustre_handle    conn;
406         int                     rc;
407         ENTRY;
408
409         rc = class_connect(&conn, obd, cluuid);
410         if (rc)
411                 RETURN(rc);
412
413         *exp = class_conn2export(&conn);
414         RETURN(0);
415 }
416
417 /*
418  * obd_disconnect handler used by the MDT to disconnect from the master target.
419  * We trigger cleanup on disconnect since it means that the MDT is about to
420  * shutdown.
421  */
422 static int qmt_device_obd_disconnect(struct obd_export *exp)
423 {
424         struct obd_device       *obd = exp->exp_obd;
425         int                      rc;
426         ENTRY;
427
428         rc = class_disconnect(exp);
429         if (rc)
430                 RETURN(rc);
431
432         rc = class_manual_cleanup(obd);
433         RETURN(0);
434 }
435
436 /*
437  * obd device operations associated with the master target.
438  */
439 static const struct obd_ops qmt_obd_ops = {
440         .o_owner        = THIS_MODULE,
441         .o_connect      = qmt_device_obd_connect,
442         .o_disconnect   = qmt_device_obd_disconnect,
443         .o_pool_new     = qmt_pool_new,
444         .o_pool_rem     = qmt_pool_rem,
445         .o_pool_add     = qmt_pool_add,
446         .o_pool_del     = qmt_pool_del,
447 };
448
449 /*
450  * Called when the MDS is fully configured. We use it to set up local objects
451  * associated with the quota master target.
452  *
453  * \param env - is the environment passed by the caller
454  * \param parent - is the lu_device of the parent, that's to say the mdt
455  * \param ld  - is the lu_device associated with the master target
456  *
457  * \retval    - 0 on success, appropriate error on failure
458  */
459 static int qmt_device_prepare(const struct lu_env *env,
460                               struct lu_device *parent,
461                               struct lu_device *ld)
462 {
463         struct qmt_device       *qmt = lu2qmt_dev(ld);
464         struct dt_object        *qmt_root;
465         int                      rc;
466         ENTRY;
467
468         /* initialize quota master root directory where all index files will be
469          * stored */
470         qmt_root = lquota_disk_dir_find_create(env, qmt->qmt_child, NULL,
471                                                QMT_DIR);
472         if (IS_ERR(qmt_root)) {
473                 rc = PTR_ERR(qmt_root);
474                 CERROR("%s: failed to create master quota directory (%d)\n",
475                        qmt->qmt_svname, rc);
476                 RETURN(rc);
477         }
478
479         qmt->qmt_root = qmt_root;
480         /* initialize on-disk indexes associated with each pool */
481         rc = qmt_pool_prepare(env, qmt, qmt_root, NULL);
482         RETURN(rc);
483 }
484
485 /*
486  * lu device operations for the quota master target
487  */
488 static const struct lu_device_operations qmt_lu_ops = {
489         .ldo_prepare            = qmt_device_prepare,
490         .ldo_process_config     = NULL, /* to be defined for dynamic pool
491                                          * configuration */
492 };
493
494 /* global variable initialization called when the lquota module is loaded */
495 int qmt_glb_init(void)
496 {
497         int rc;
498         ENTRY;
499
500         rc = class_register_type(&qmt_obd_ops, NULL, true,
501                                  LUSTRE_QMT_NAME, &qmt_device_type);
502         RETURN(rc);
503 }
504
505 /* called when the lquota module is about to be unloaded */
506 void qmt_glb_fini(void)
507 {
508         class_unregister_type(LUSTRE_QMT_NAME);
509 }