Whamcloud - gitweb
LU-10181 mds: init cpt params for mdt IO service
[fs/lustre-release.git] / lustre / mdt / mdt_mds.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2013, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  *
31  * lustre/mdt/mdt_mds.c
32  *
33  * Lustre Metadata Service Layer
34  *
35  * Author: Di Wang <di.wang@whamcloud.com>
36  **/
37
38 #define DEBUG_SUBSYSTEM S_MDS
39
40 #include <linux/module.h>
41
42 #include <obd_support.h>
43 /* struct ptlrpc_request */
44 #include <lustre_net.h>
45 /* struct obd_export */
46 #include <lustre_export.h>
47 /* struct obd_device */
48 #include <obd.h>
49 /* lu2dt_dev() */
50 #include <dt_object.h>
51 #include <lustre_mds.h>
52 #include "mdt_internal.h"
53 #include <lustre_quota.h>
54 #include <lustre_acl.h>
55 #include <uapi/linux/lustre/lustre_param.h>
56
57 struct mds_device {
58         /* super-class */
59         struct md_device         mds_md_dev;
60         struct ptlrpc_service   *mds_regular_service;
61         struct ptlrpc_service   *mds_readpage_service;
62         struct ptlrpc_service   *mds_out_service;
63         struct ptlrpc_service   *mds_setattr_service;
64         struct ptlrpc_service   *mds_mdsc_service;
65         struct ptlrpc_service   *mds_mdss_service;
66         struct ptlrpc_service   *mds_fld_service;
67         struct ptlrpc_service   *mds_io_service;
68         struct mutex             mds_health_mutex;
69 };
70
71 /*
72  *  * Initialized in mds_mod_init().
73  *   */
74 static unsigned long mds_num_threads;
75 module_param(mds_num_threads, ulong, 0444);
76 MODULE_PARM_DESC(mds_num_threads, "number of MDS service threads to start");
77
78 int mds_max_io_threads = 512;
79 module_param(mds_max_io_threads, int, 0444);
80 MODULE_PARM_DESC(mds_max_io_threads, "maximum number of MDS IO service threads");
81
82 static char *mds_io_num_cpts;
83 module_param(mds_io_num_cpts, charp, 0444);
84 MODULE_PARM_DESC(mds_io_num_cpts,
85                  "CPU partitions MDS IO threads should run on");
86
87 static struct cfs_cpt_table *mdt_io_cptable;
88
89 static char *mds_num_cpts;
90 module_param(mds_num_cpts, charp, 0444);
91 MODULE_PARM_DESC(mds_num_cpts, "CPU partitions MDS threads should run on");
92
93 static unsigned long mds_rdpg_num_threads;
94 module_param(mds_rdpg_num_threads, ulong, 0444);
95 MODULE_PARM_DESC(mds_rdpg_num_threads,
96                  "number of MDS readpage service threads to start");
97
98 static char *mds_rdpg_num_cpts;
99 module_param(mds_rdpg_num_cpts, charp, 0444);
100 MODULE_PARM_DESC(mds_rdpg_num_cpts,
101                  "CPU partitions MDS readpage threads should run on");
102
103 /* NB: these two should be removed along with setattr service in the future */
104 static unsigned long mds_attr_num_threads;
105 module_param(mds_attr_num_threads, ulong, 0444);
106 MODULE_PARM_DESC(mds_attr_num_threads,
107                  "number of MDS setattr service threads to start");
108
109 static char *mds_attr_num_cpts;
110 module_param(mds_attr_num_cpts, charp, 0444);
111 MODULE_PARM_DESC(mds_attr_num_cpts,
112                  "CPU partitions MDS setattr threads should run on");
113
114 /* device init/fini methods */
115 static void mds_stop_ptlrpc_service(struct mds_device *m)
116 {
117         ENTRY;
118
119         mutex_lock(&m->mds_health_mutex);
120         if (m->mds_regular_service != NULL) {
121                 ptlrpc_unregister_service(m->mds_regular_service);
122                 m->mds_regular_service = NULL;
123         }
124         if (m->mds_readpage_service != NULL) {
125                 ptlrpc_unregister_service(m->mds_readpage_service);
126                 m->mds_readpage_service = NULL;
127         }
128         if (m->mds_out_service != NULL) {
129                 ptlrpc_unregister_service(m->mds_out_service);
130                 m->mds_out_service = NULL;
131         }
132         if (m->mds_setattr_service != NULL) {
133                 ptlrpc_unregister_service(m->mds_setattr_service);
134                 m->mds_setattr_service = NULL;
135         }
136         if (m->mds_mdsc_service != NULL) {
137                 ptlrpc_unregister_service(m->mds_mdsc_service);
138                 m->mds_mdsc_service = NULL;
139         }
140         if (m->mds_mdss_service != NULL) {
141                 ptlrpc_unregister_service(m->mds_mdss_service);
142                 m->mds_mdss_service = NULL;
143         }
144         if (m->mds_fld_service != NULL) {
145                 ptlrpc_unregister_service(m->mds_fld_service);
146                 m->mds_fld_service = NULL;
147         }
148         if (m->mds_io_service != NULL) {
149                 ptlrpc_unregister_service(m->mds_io_service);
150                 m->mds_io_service = NULL;
151         }
152         mutex_unlock(&m->mds_health_mutex);
153
154         if (mdt_io_cptable != NULL) {
155                 cfs_cpt_table_free(mdt_io_cptable);
156                 mdt_io_cptable = NULL;
157         }
158
159         EXIT;
160 }
161
162 static int mds_start_ptlrpc_service(struct mds_device *m)
163 {
164         static struct ptlrpc_service_conf conf;
165         struct obd_device *obd = m->mds_md_dev.md_lu_dev.ld_obd;
166         nodemask_t *mask;
167         int rc = 0;
168
169         ENTRY;
170
171         conf = (typeof(conf)) {
172                 .psc_name               = LUSTRE_MDT_NAME,
173                 .psc_watchdog_factor    = MDT_SERVICE_WATCHDOG_FACTOR,
174                 .psc_buf                = {
175                         .bc_nbufs               = MDS_NBUFS,
176                         .bc_buf_size            = MDS_REG_BUFSIZE,
177                         .bc_req_max_size        = MDS_REG_MAXREQSIZE,
178                         .bc_rep_max_size        = MDS_REG_MAXREPSIZE,
179                         .bc_req_portal          = MDS_REQUEST_PORTAL,
180                         .bc_rep_portal          = MDC_REPLY_PORTAL,
181                 },
182                 /*
183                  * We'd like to have a mechanism to set this on a per-device
184                  * basis, but alas...
185                  */
186                 .psc_thr                = {
187                         .tc_thr_name            = LUSTRE_MDT_NAME,
188                         .tc_thr_factor          = MDS_THR_FACTOR,
189                         .tc_nthrs_init          = MDS_NTHRS_INIT,
190                         .tc_nthrs_base          = MDS_NTHRS_BASE,
191                         .tc_nthrs_max           = MDS_NTHRS_MAX,
192                         .tc_nthrs_user          = mds_num_threads,
193                         .tc_cpu_affinity        = 1,
194                         .tc_ctx_tags            = LCT_MD_THREAD,
195                 },
196                 .psc_cpt                = {
197                         .cc_pattern             = mds_num_cpts,
198                 },
199                 .psc_ops                = {
200                         .so_req_handler         = tgt_request_handle,
201                         .so_req_printer         = target_print_req,
202                         .so_hpreq_handler       = ptlrpc_hpreq_handler,
203                 },
204         };
205         m->mds_regular_service = ptlrpc_register_service(&conf, &obd->obd_kset,
206                                                          obd->obd_debugfs_entry);
207         if (IS_ERR(m->mds_regular_service)) {
208                 rc = PTR_ERR(m->mds_regular_service);
209                 CERROR("failed to start regular mdt service: %d\n", rc);
210                 m->mds_regular_service = NULL;
211
212                 RETURN(rc);
213         }
214
215         /*
216          * readpage service configuration. Parameters have to be adjusted,
217          * ideally.
218          */
219         memset(&conf, 0, sizeof(conf));
220         conf = (typeof(conf)) {
221                 .psc_name               = LUSTRE_MDT_NAME "_readpage",
222                 .psc_watchdog_factor    = MDT_SERVICE_WATCHDOG_FACTOR,
223                 .psc_buf                = {
224                         .bc_nbufs               = MDS_NBUFS,
225                         .bc_buf_size            = MDS_BUFSIZE,
226                         .bc_req_max_size        = MDS_MAXREQSIZE,
227                         .bc_rep_max_size        = MDS_MAXREPSIZE,
228                         .bc_req_portal          = MDS_READPAGE_PORTAL,
229                         .bc_rep_portal          = MDC_REPLY_PORTAL,
230                 },
231                 .psc_thr                = {
232                         .tc_thr_name            = LUSTRE_MDT_NAME "_rdpg",
233                         .tc_thr_factor          = MDS_RDPG_THR_FACTOR,
234                         .tc_nthrs_init          = MDS_RDPG_NTHRS_INIT,
235                         .tc_nthrs_base          = MDS_RDPG_NTHRS_BASE,
236                         .tc_nthrs_max           = MDS_RDPG_NTHRS_MAX,
237                         .tc_nthrs_user          = mds_rdpg_num_threads,
238                         .tc_cpu_affinity        = 1,
239                         .tc_ctx_tags            = LCT_MD_THREAD,
240                 },
241                 .psc_cpt                = {
242                         .cc_pattern             = mds_rdpg_num_cpts,
243                 },
244                 .psc_ops                = {
245                         .so_req_handler         = tgt_request_handle,
246                         .so_req_printer         = target_print_req,
247                 },
248         };
249         m->mds_readpage_service = ptlrpc_register_service(&conf, &obd->obd_kset,
250                                                           obd->obd_debugfs_entry);
251         if (IS_ERR(m->mds_readpage_service)) {
252                 rc = PTR_ERR(m->mds_readpage_service);
253                 CERROR("failed to start readpage service: %d\n", rc);
254                 m->mds_readpage_service = NULL;
255
256                 GOTO(err_mds_svc, rc);
257         }
258
259         /*
260          * setattr service configuration.
261          *
262          * XXX To keep the compatibility with old client(< 2.2), we need to
263          * preserve this portal for a certain time, it should be removed
264          * eventually. LU-617.
265          */
266         memset(&conf, 0, sizeof(conf));
267         conf = (typeof(conf)) {
268                 .psc_name               = LUSTRE_MDT_NAME "_setattr",
269                 .psc_watchdog_factor    = MDT_SERVICE_WATCHDOG_FACTOR,
270                 .psc_buf                = {
271                         .bc_nbufs               = MDS_NBUFS,
272                         .bc_buf_size            = MDS_BUFSIZE,
273                         .bc_req_max_size        = MDS_MAXREQSIZE,
274                         .bc_rep_max_size        = MDS_LOV_MAXREPSIZE,
275                         .bc_req_portal          = MDS_SETATTR_PORTAL,
276                         .bc_rep_portal          = MDC_REPLY_PORTAL,
277                 },
278                 .psc_thr                = {
279                         .tc_thr_name            = LUSTRE_MDT_NAME "_attr",
280                         .tc_thr_factor          = MDS_SETA_THR_FACTOR,
281                         .tc_nthrs_init          = MDS_SETA_NTHRS_INIT,
282                         .tc_nthrs_base          = MDS_SETA_NTHRS_BASE,
283                         .tc_nthrs_max           = MDS_SETA_NTHRS_MAX,
284                         .tc_nthrs_user          = mds_attr_num_threads,
285                         .tc_cpu_affinity        = 1,
286                         .tc_ctx_tags            = LCT_MD_THREAD,
287                 },
288                 .psc_cpt                = {
289                         .cc_pattern             = mds_attr_num_cpts,
290                 },
291                 .psc_ops                = {
292                         .so_req_handler         = tgt_request_handle,
293                         .so_req_printer         = target_print_req,
294                         .so_hpreq_handler       = NULL,
295                 },
296         };
297         m->mds_setattr_service = ptlrpc_register_service(&conf, &obd->obd_kset,
298                                                          obd->obd_debugfs_entry);
299         if (IS_ERR(m->mds_setattr_service)) {
300                 rc = PTR_ERR(m->mds_setattr_service);
301                 CERROR("failed to start setattr service: %d\n", rc);
302                 m->mds_setattr_service = NULL;
303
304                 GOTO(err_mds_svc, rc);
305         }
306
307         /* Object update service */
308         conf = (typeof(conf)) {
309                 .psc_name               = LUSTRE_MDT_NAME "_out",
310                 .psc_watchdog_factor    = MDT_SERVICE_WATCHDOG_FACTOR,
311                 .psc_buf                = {
312                         .bc_nbufs               = MDS_NBUFS,
313                         .bc_buf_size            = OUT_BUFSIZE,
314                         .bc_req_max_size        = OUT_MAXREQSIZE,
315                         .bc_rep_max_size        = OUT_MAXREPSIZE,
316                         .bc_req_portal          = OUT_PORTAL,
317                         .bc_rep_portal          = OSC_REPLY_PORTAL,
318                 },
319                 /*
320                  * We'd like to have a mechanism to set this on a per-device
321                  * basis, but alas...
322                  */
323                 .psc_thr                = {
324                         .tc_thr_name            = LUSTRE_MDT_NAME "_out",
325                         .tc_thr_factor          = MDS_THR_FACTOR,
326                         .tc_nthrs_init          = MDS_NTHRS_INIT,
327                         .tc_nthrs_base          = MDS_NTHRS_BASE,
328                         .tc_nthrs_max           = MDS_NTHRS_MAX,
329                         .tc_nthrs_user          = mds_num_threads,
330                         .tc_cpu_affinity        = 1,
331                         .tc_ctx_tags            = LCT_MD_THREAD |
332                                                   LCT_DT_THREAD,
333                 },
334                 .psc_cpt                = {
335                         .cc_pattern             = mds_num_cpts,
336                 },
337                 .psc_ops                = {
338                         .so_req_handler         = tgt_request_handle,
339                         .so_req_printer         = target_print_req,
340                         .so_hpreq_handler       = NULL,
341                 },
342         };
343         m->mds_out_service = ptlrpc_register_service(&conf, &obd->obd_kset,
344                                                      obd->obd_debugfs_entry);
345         if (IS_ERR(m->mds_out_service)) {
346                 rc = PTR_ERR(m->mds_out_service);
347                 CERROR("failed to start out service: %d\n", rc);
348                 m->mds_out_service = NULL;
349                 GOTO(err_mds_svc, rc);
350         }
351
352         /*
353          * sequence controller service configuration
354          */
355         memset(&conf, 0, sizeof(conf));
356         conf = (typeof(conf)) {
357                 .psc_name               = LUSTRE_MDT_NAME "_seqs",
358                 .psc_watchdog_factor    = MDT_SERVICE_WATCHDOG_FACTOR,
359                 .psc_buf                = {
360                         .bc_nbufs               = MDS_NBUFS,
361                         .bc_buf_size            = SEQ_BUFSIZE,
362                         .bc_req_max_size        = SEQ_MAXREQSIZE,
363                         .bc_rep_max_size        = SEQ_MAXREPSIZE,
364                         .bc_req_portal          = SEQ_CONTROLLER_PORTAL,
365                         .bc_rep_portal          = MDC_REPLY_PORTAL,
366                 },
367                 .psc_thr                = {
368                         .tc_thr_name            = LUSTRE_MDT_NAME "_seqs",
369                         .tc_nthrs_init          = MDS_OTHR_NTHRS_INIT,
370                         .tc_nthrs_max           = MDS_OTHR_NTHRS_MAX,
371                         .tc_ctx_tags            = LCT_MD_THREAD,
372                 },
373                 .psc_ops                = {
374                         .so_req_handler         = tgt_request_handle,
375                         .so_req_printer         = target_print_req,
376                         .so_hpreq_handler       = NULL,
377                 },
378         };
379         m->mds_mdsc_service = ptlrpc_register_service(&conf, &obd->obd_kset,
380                                                       obd->obd_debugfs_entry);
381         if (IS_ERR(m->mds_mdsc_service)) {
382                 rc = PTR_ERR(m->mds_mdsc_service);
383                 CERROR("failed to start seq controller service: %d\n", rc);
384                 m->mds_mdsc_service = NULL;
385
386                 GOTO(err_mds_svc, rc);
387         }
388
389         /*
390          * metadata sequence server service configuration
391          */
392         memset(&conf, 0, sizeof(conf));
393         conf = (typeof(conf)) {
394                 .psc_name               = LUSTRE_MDT_NAME "_seqm",
395                 .psc_watchdog_factor    = MDT_SERVICE_WATCHDOG_FACTOR,
396                 .psc_buf                = {
397                         .bc_nbufs               = MDS_NBUFS,
398                         .bc_buf_size            = SEQ_BUFSIZE,
399                         .bc_req_max_size        = SEQ_MAXREQSIZE,
400                         .bc_rep_max_size        = SEQ_MAXREPSIZE,
401                         .bc_req_portal          = SEQ_METADATA_PORTAL,
402                         .bc_rep_portal          = MDC_REPLY_PORTAL,
403                 },
404                 .psc_thr                = {
405                         .tc_thr_name            = LUSTRE_MDT_NAME "_seqm",
406                         .tc_nthrs_init          = MDS_OTHR_NTHRS_INIT,
407                         .tc_nthrs_max           = MDS_OTHR_NTHRS_MAX,
408                         .tc_ctx_tags            = LCT_MD_THREAD | LCT_DT_THREAD
409                 },
410                 .psc_ops                = {
411                         .so_req_handler         = tgt_request_handle,
412                         .so_req_printer         = target_print_req,
413                         .so_hpreq_handler       = NULL,
414                 },
415         };
416         m->mds_mdss_service = ptlrpc_register_service(&conf, &obd->obd_kset,
417                                                       obd->obd_debugfs_entry);
418         if (IS_ERR(m->mds_mdss_service)) {
419                 rc = PTR_ERR(m->mds_mdss_service);
420                 CERROR("failed to start metadata seq server service: %d\n", rc);
421                 m->mds_mdss_service = NULL;
422
423                 GOTO(err_mds_svc, rc);
424         }
425
426         /* FLD service start */
427         memset(&conf, 0, sizeof(conf));
428         conf = (typeof(conf)) {
429                 .psc_name            = LUSTRE_MDT_NAME "_fld",
430                 .psc_watchdog_factor = MDT_SERVICE_WATCHDOG_FACTOR,
431                 .psc_buf                = {
432                         .bc_nbufs               = MDS_NBUFS,
433                         .bc_buf_size            = FLD_BUFSIZE,
434                         .bc_req_max_size        = FLD_MAXREQSIZE,
435                         .bc_rep_max_size        = FLD_MAXREPSIZE,
436                         .bc_req_portal          = FLD_REQUEST_PORTAL,
437                         .bc_rep_portal          = MDC_REPLY_PORTAL,
438                 },
439                 .psc_thr                = {
440                         .tc_thr_name            = LUSTRE_MDT_NAME "_fld",
441                         .tc_nthrs_init          = MDS_OTHR_NTHRS_INIT,
442                         .tc_nthrs_max           = MDS_OTHR_NTHRS_MAX,
443                         .tc_ctx_tags            = LCT_DT_THREAD | LCT_MD_THREAD,
444                 },
445                 .psc_ops                = {
446                         .so_req_handler         = tgt_request_handle,
447                         .so_req_printer         = target_print_req,
448                         .so_hpreq_handler       = NULL,
449                 },
450         };
451         m->mds_fld_service = ptlrpc_register_service(&conf, &obd->obd_kset,
452                                                      obd->obd_debugfs_entry);
453         if (IS_ERR(m->mds_fld_service)) {
454                 rc = PTR_ERR(m->mds_fld_service);
455                 CERROR("failed to start fld service: %d\n", rc);
456                 m->mds_fld_service = NULL;
457
458                 GOTO(err_mds_svc, rc);
459         }
460
461
462         mask = cfs_cpt_nodemask(cfs_cpt_table, CFS_CPT_ANY);
463         /* event CPT feature is disabled in libcfs level by set partition
464          * number to 1, we still want to set node affinity for io service */
465         if (cfs_cpt_number(cfs_cpt_table) == 1 && nodes_weight(*mask) > 1) {
466                 int cpt = 0;
467                 int i;
468
469                 mdt_io_cptable = cfs_cpt_table_alloc(nodes_weight(*mask));
470                 for_each_node_mask(i, *mask) {
471                         if (mdt_io_cptable == NULL) {
472                                 CWARN("MDS failed to create CPT table\n");
473                                 break;
474                         }
475
476                         rc = cfs_cpt_set_node(mdt_io_cptable, cpt++, i);
477                         if (!rc) {
478                                 CWARN("MDS Failed to set node %d for"
479                                       "IO CPT table\n", i);
480                                 cfs_cpt_table_free(mdt_io_cptable);
481                                 mdt_io_cptable = NULL;
482                                 break;
483                         }
484                 }
485         }
486
487         memset(&conf, 0, sizeof(conf));
488         conf = (typeof(conf)) {
489                 .psc_name               = LUSTRE_MDT_NAME "_io",
490                 .psc_watchdog_factor    = MDT_SERVICE_WATCHDOG_FACTOR,
491                 .psc_buf                = {
492                         .bc_nbufs               = OST_NBUFS,
493                         .bc_buf_size            = OST_IO_BUFSIZE,
494                         .bc_req_max_size        = OST_IO_MAXREQSIZE,
495                         .bc_rep_max_size        = OST_IO_MAXREPSIZE,
496                         .bc_req_portal          = MDS_IO_PORTAL,
497                         .bc_rep_portal          = MDC_REPLY_PORTAL,
498                 },
499                 .psc_thr                = {
500                         .tc_thr_name            = LUSTRE_MDT_NAME "_io",
501                         .tc_thr_factor          = OSS_THR_FACTOR,
502                         .tc_nthrs_init          = OSS_NTHRS_INIT,
503                         .tc_nthrs_base          = OSS_NTHRS_BASE,
504                         .tc_nthrs_max           = mds_max_io_threads,
505                         .tc_nthrs_user          = mds_num_threads,
506                         .tc_cpu_affinity        = 1,
507                         .tc_ctx_tags            = LCT_DT_THREAD | LCT_MD_THREAD,
508                 },
509                 .psc_cpt                = {
510                         .cc_cptable             = mdt_io_cptable,
511                         .cc_pattern             = mdt_io_cptable == NULL ?
512                                                   mds_io_num_cpts : NULL,
513                 },
514                 .psc_ops                = {
515                         .so_thr_init            = tgt_io_thread_init,
516                         .so_thr_done            = tgt_io_thread_done,
517                         .so_req_handler         = tgt_request_handle,
518                         .so_req_printer         = target_print_req,
519                         .so_hpreq_handler       = tgt_hpreq_handler,
520                 },
521         };
522         m->mds_io_service = ptlrpc_register_service(&conf, &obd->obd_kset,
523                                                     obd->obd_debugfs_entry);
524         if (IS_ERR(m->mds_io_service)) {
525                 rc = PTR_ERR(m->mds_io_service);
526                 CERROR("failed to start MDT I/O service: %d\n", rc);
527                 m->mds_io_service = NULL;
528                 GOTO(err_mds_svc, rc);
529         }
530
531         EXIT;
532 err_mds_svc:
533         if (rc)
534                 mds_stop_ptlrpc_service(m);
535
536         return rc;
537 }
538
539 static inline struct mds_device *mds_dev(struct lu_device *d)
540 {
541         return container_of0(d, struct mds_device, mds_md_dev.md_lu_dev);
542 }
543
544 static struct lu_device *mds_device_fini(const struct lu_env *env,
545                                          struct lu_device *d)
546 {
547         struct mds_device *m = mds_dev(d);
548         struct obd_device *obd = d->ld_obd;
549         ENTRY;
550
551         mds_stop_ptlrpc_service(m);
552         lprocfs_obd_cleanup(obd);
553         RETURN(NULL);
554 }
555
556 static struct lu_device *mds_device_free(const struct lu_env *env,
557                                          struct lu_device *d)
558 {
559         struct mds_device *m = mds_dev(d);
560         ENTRY;
561
562         md_device_fini(&m->mds_md_dev);
563         OBD_FREE_PTR(m);
564         RETURN(NULL);
565 }
566
567 static struct lu_device *mds_device_alloc(const struct lu_env *env,
568                                           struct lu_device_type *t,
569                                           struct lustre_cfg *cfg)
570 {
571         struct mds_device        *m;
572         struct obd_device        *obd;
573         struct lu_device          *l;
574         int rc;
575
576         OBD_ALLOC_PTR(m);
577         if (m == NULL)
578                 return ERR_PTR(-ENOMEM);
579
580         md_device_init(&m->mds_md_dev, t);
581         l = &m->mds_md_dev.md_lu_dev;
582
583         obd = class_name2obd(lustre_cfg_string(cfg, 0));
584         LASSERT(obd != NULL);
585
586         l->ld_obd = obd;
587         /* set this lu_device to obd, because error handling need it */
588         obd->obd_lu_dev = l;
589
590         rc = lprocfs_obd_setup(obd, true);
591         if (rc != 0) {
592                 mds_device_free(env, l);
593                 l = ERR_PTR(rc);
594                 return l;
595         }
596
597         mutex_init(&m->mds_health_mutex);
598
599         rc = mds_start_ptlrpc_service(m);
600         if (rc != 0) {
601                 lprocfs_obd_cleanup(obd);
602                 mds_device_free(env, l);
603                 l = ERR_PTR(rc);
604                 return l;
605         }
606         return l;
607 }
608
609 /* type constructor/destructor: mdt_type_init, mdt_type_fini */
610 LU_TYPE_INIT_FINI(mds, &mdt_thread_key);
611
612 static struct lu_device_type_operations mds_device_type_ops = {
613         .ldto_init = mds_type_init,
614         .ldto_fini = mds_type_fini,
615
616         .ldto_start = mds_type_start,
617         .ldto_stop  = mds_type_stop,
618
619         .ldto_device_alloc = mds_device_alloc,
620         .ldto_device_free  = mds_device_free,
621         .ldto_device_fini  = mds_device_fini
622 };
623
624 static struct lu_device_type mds_device_type = {
625         .ldt_tags     = LU_DEVICE_MD,
626         .ldt_name     = LUSTRE_MDS_NAME,
627         .ldt_ops      = &mds_device_type_ops,
628         .ldt_ctx_tags = LCT_MD_THREAD
629 };
630
631 static int mds_health_check(const struct lu_env *env, struct obd_device *obd)
632 {
633         struct mds_device *mds = mds_dev(obd->obd_lu_dev);
634         int rc = 0;
635
636
637         mutex_lock(&mds->mds_health_mutex);
638         rc |= ptlrpc_service_health_check(mds->mds_regular_service);
639         rc |= ptlrpc_service_health_check(mds->mds_readpage_service);
640         rc |= ptlrpc_service_health_check(mds->mds_out_service);
641         rc |= ptlrpc_service_health_check(mds->mds_setattr_service);
642         rc |= ptlrpc_service_health_check(mds->mds_mdsc_service);
643         rc |= ptlrpc_service_health_check(mds->mds_mdss_service);
644         rc |= ptlrpc_service_health_check(mds->mds_fld_service);
645         rc |= ptlrpc_service_health_check(mds->mds_io_service);
646         mutex_unlock(&mds->mds_health_mutex);
647
648         return rc != 0 ? 1 : 0;
649 }
650
651 static struct obd_ops mds_obd_device_ops = {
652         .o_owner           = THIS_MODULE,
653         .o_health_check    = mds_health_check,
654 };
655
656 int mds_mod_init(void)
657 {
658         return class_register_type(&mds_obd_device_ops, NULL, false, NULL,
659                                    LUSTRE_MDS_NAME, &mds_device_type);
660 }
661
662 void mds_mod_exit(void)
663 {
664         class_unregister_type(LUSTRE_MDS_NAME);
665 }