Whamcloud - gitweb
LU-13004 ptlrpc: Allow BULK_BUF_KIOV to accept a kvec
[fs/lustre-release.git] / lustre / mdt / mdt_mds.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2013, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  *
31  * lustre/mdt/mdt_mds.c
32  *
33  * Lustre Metadata Service Layer
34  *
35  * Author: Di Wang <di.wang@whamcloud.com>
36  **/
37
38 #define DEBUG_SUBSYSTEM S_MDS
39
40 #include <linux/module.h>
41
42 #include <obd_support.h>
43 /* struct ptlrpc_request */
44 #include <lustre_net.h>
45 /* struct obd_export */
46 #include <lustre_export.h>
47 /* struct obd_device */
48 #include <obd.h>
49 /* lu2dt_dev() */
50 #include <dt_object.h>
51 #include <lustre_mds.h>
52 #include "mdt_internal.h"
53 #include <lustre_quota.h>
54 #include <lustre_acl.h>
55 #include <uapi/linux/lustre/lustre_param.h>
56
57 struct mds_device {
58         /* super-class */
59         struct md_device         mds_md_dev;
60         struct ptlrpc_service   *mds_regular_service;
61         struct ptlrpc_service   *mds_readpage_service;
62         struct ptlrpc_service   *mds_out_service;
63         struct ptlrpc_service   *mds_setattr_service;
64         struct ptlrpc_service   *mds_mdsc_service;
65         struct ptlrpc_service   *mds_mdss_service;
66         struct ptlrpc_service   *mds_fld_service;
67         struct ptlrpc_service   *mds_io_service;
68         struct mutex             mds_health_mutex;
69 };
70
71 /*
72  *  * Initialized in mds_mod_init().
73  *   */
74 static unsigned long mds_num_threads;
75 module_param(mds_num_threads, ulong, 0444);
76 MODULE_PARM_DESC(mds_num_threads, "number of MDS service threads to start");
77
78 static unsigned int mds_cpu_bind = 1;
79 module_param(mds_cpu_bind, uint, 0444);
80 MODULE_PARM_DESC(mds_cpu_bind,
81                  "bind MDS threads to particular CPU partitions");
82
83 int mds_max_io_threads = 512;
84 module_param(mds_max_io_threads, int, 0444);
85 MODULE_PARM_DESC(mds_max_io_threads,
86                  "maximum number of MDS IO service threads");
87
88 static unsigned int mds_io_cpu_bind = 1;
89 module_param(mds_io_cpu_bind, uint, 0444);
90 MODULE_PARM_DESC(mds_io_cpu_bind,
91                  "bind MDS IO threads to particular CPU partitions");
92
93 static char *mds_io_num_cpts;
94 module_param(mds_io_num_cpts, charp, 0444);
95 MODULE_PARM_DESC(mds_io_num_cpts,
96                  "CPU partitions MDS IO threads should run on");
97
98 static struct cfs_cpt_table *mdt_io_cptable;
99
100 static char *mds_num_cpts;
101 module_param(mds_num_cpts, charp, 0444);
102 MODULE_PARM_DESC(mds_num_cpts, "CPU partitions MDS threads should run on");
103
104 static unsigned long mds_rdpg_num_threads;
105 module_param(mds_rdpg_num_threads, ulong, 0444);
106 MODULE_PARM_DESC(mds_rdpg_num_threads,
107                  "number of MDS readpage service threads to start");
108
109 static unsigned int mds_rdpg_cpu_bind = 1;
110 module_param(mds_rdpg_cpu_bind, uint, 0444);
111 MODULE_PARM_DESC(mds_rdpg_cpu_bind,
112                  "bind MDS readpage threads to particular CPU partitions");
113
114 static char *mds_rdpg_num_cpts;
115 module_param(mds_rdpg_num_cpts, charp, 0444);
116 MODULE_PARM_DESC(mds_rdpg_num_cpts,
117                  "CPU partitions MDS readpage threads should run on");
118
119 /* NB: these two should be removed along with setattr service in the future */
120 static unsigned long mds_attr_num_threads;
121 module_param(mds_attr_num_threads, ulong, 0444);
122 MODULE_PARM_DESC(mds_attr_num_threads,
123                  "number of MDS setattr service threads to start");
124
125 static unsigned int mds_attr_cpu_bind = 1;
126 module_param(mds_attr_cpu_bind, uint, 0444);
127 MODULE_PARM_DESC(mds_attr_cpu_bind,
128                  "bind MDS setattr threads to particular CPU partitions");
129
130 static char *mds_attr_num_cpts;
131 module_param(mds_attr_num_cpts, charp, 0444);
132 MODULE_PARM_DESC(mds_attr_num_cpts,
133                  "CPU partitions MDS setattr threads should run on");
134
135 /* device init/fini methods */
136 static void mds_stop_ptlrpc_service(struct mds_device *m)
137 {
138         ENTRY;
139
140         mutex_lock(&m->mds_health_mutex);
141         if (m->mds_regular_service != NULL) {
142                 ptlrpc_unregister_service(m->mds_regular_service);
143                 m->mds_regular_service = NULL;
144         }
145         if (m->mds_readpage_service != NULL) {
146                 ptlrpc_unregister_service(m->mds_readpage_service);
147                 m->mds_readpage_service = NULL;
148         }
149         if (m->mds_out_service != NULL) {
150                 ptlrpc_unregister_service(m->mds_out_service);
151                 m->mds_out_service = NULL;
152         }
153         if (m->mds_setattr_service != NULL) {
154                 ptlrpc_unregister_service(m->mds_setattr_service);
155                 m->mds_setattr_service = NULL;
156         }
157         if (m->mds_mdsc_service != NULL) {
158                 ptlrpc_unregister_service(m->mds_mdsc_service);
159                 m->mds_mdsc_service = NULL;
160         }
161         if (m->mds_mdss_service != NULL) {
162                 ptlrpc_unregister_service(m->mds_mdss_service);
163                 m->mds_mdss_service = NULL;
164         }
165         if (m->mds_fld_service != NULL) {
166                 ptlrpc_unregister_service(m->mds_fld_service);
167                 m->mds_fld_service = NULL;
168         }
169         if (m->mds_io_service != NULL) {
170                 ptlrpc_unregister_service(m->mds_io_service);
171                 m->mds_io_service = NULL;
172         }
173         mutex_unlock(&m->mds_health_mutex);
174
175         if (mdt_io_cptable != NULL) {
176                 cfs_cpt_table_free(mdt_io_cptable);
177                 mdt_io_cptable = NULL;
178         }
179
180         EXIT;
181 }
182
183 static int mds_start_ptlrpc_service(struct mds_device *m)
184 {
185         static struct ptlrpc_service_conf conf;
186         struct obd_device *obd = m->mds_md_dev.md_lu_dev.ld_obd;
187         nodemask_t *mask;
188         int rc = 0;
189
190         ENTRY;
191
192         conf = (typeof(conf)) {
193                 .psc_name               = LUSTRE_MDT_NAME,
194                 .psc_watchdog_factor    = MDT_SERVICE_WATCHDOG_FACTOR,
195                 .psc_buf                = {
196                         .bc_nbufs               = MDS_NBUFS,
197                         .bc_buf_size            = MDS_REG_BUFSIZE,
198                         .bc_req_max_size        = MDS_REG_MAXREQSIZE,
199                         .bc_rep_max_size        = MDS_REG_MAXREPSIZE,
200                         .bc_req_portal          = MDS_REQUEST_PORTAL,
201                         .bc_rep_portal          = MDC_REPLY_PORTAL,
202                 },
203                 /*
204                  * We'd like to have a mechanism to set this on a per-device
205                  * basis, but alas...
206                  */
207                 .psc_thr                = {
208                         .tc_thr_name            = LUSTRE_MDT_NAME,
209                         .tc_thr_factor          = MDS_THR_FACTOR,
210                         .tc_nthrs_init          = MDS_NTHRS_INIT,
211                         .tc_nthrs_base          = MDS_NTHRS_BASE,
212                         .tc_nthrs_max           = MDS_NTHRS_MAX,
213                         .tc_nthrs_user          = mds_num_threads,
214                         .tc_cpu_bind            = mds_cpu_bind,
215                         /* LCT_DT_THREAD is required as MDT threads may scan
216                          * all LDLM namespaces (including OFD-originated) to
217                          * cancel LDLM locks */
218                         .tc_ctx_tags            = LCT_MD_THREAD | LCT_DT_THREAD,
219                 },
220                 .psc_cpt                = {
221                         .cc_pattern             = mds_num_cpts,
222                         .cc_affinity            = true,
223                 },
224                 .psc_ops                = {
225                         .so_req_handler         = tgt_request_handle,
226                         .so_req_printer         = target_print_req,
227                         .so_hpreq_handler       = ptlrpc_hpreq_handler,
228                 },
229         };
230         m->mds_regular_service = ptlrpc_register_service(&conf, &obd->obd_kset,
231                                                          obd->obd_debugfs_entry);
232         if (IS_ERR(m->mds_regular_service)) {
233                 rc = PTR_ERR(m->mds_regular_service);
234                 CERROR("failed to start regular mdt service: %d\n", rc);
235                 m->mds_regular_service = NULL;
236
237                 RETURN(rc);
238         }
239
240         /*
241          * readpage service configuration. Parameters have to be adjusted,
242          * ideally.
243          */
244         memset(&conf, 0, sizeof(conf));
245         conf = (typeof(conf)) {
246                 .psc_name               = LUSTRE_MDT_NAME "_readpage",
247                 .psc_watchdog_factor    = MDT_SERVICE_WATCHDOG_FACTOR,
248                 .psc_buf                = {
249                         .bc_nbufs               = MDS_NBUFS,
250                         .bc_buf_size            = MDS_BUFSIZE,
251                         .bc_req_max_size        = MDS_MAXREQSIZE,
252                         .bc_rep_max_size        = MDS_MAXREPSIZE,
253                         .bc_req_portal          = MDS_READPAGE_PORTAL,
254                         .bc_rep_portal          = MDC_REPLY_PORTAL,
255                 },
256                 .psc_thr                = {
257                         .tc_thr_name            = LUSTRE_MDT_NAME "_rdpg",
258                         .tc_thr_factor          = MDS_RDPG_THR_FACTOR,
259                         .tc_nthrs_init          = MDS_RDPG_NTHRS_INIT,
260                         .tc_nthrs_base          = MDS_RDPG_NTHRS_BASE,
261                         .tc_nthrs_max           = MDS_RDPG_NTHRS_MAX,
262                         .tc_nthrs_user          = mds_rdpg_num_threads,
263                         .tc_cpu_bind            = mds_rdpg_cpu_bind,
264                         .tc_ctx_tags            = LCT_MD_THREAD,
265                 },
266                 .psc_cpt                = {
267                         .cc_pattern             = mds_rdpg_num_cpts,
268                         .cc_affinity            = true,
269                 },
270                 .psc_ops                = {
271                         .so_req_handler         = tgt_request_handle,
272                         .so_req_printer         = target_print_req,
273                 },
274         };
275         m->mds_readpage_service = ptlrpc_register_service(&conf, &obd->obd_kset,
276                                                           obd->obd_debugfs_entry);
277         if (IS_ERR(m->mds_readpage_service)) {
278                 rc = PTR_ERR(m->mds_readpage_service);
279                 CERROR("failed to start readpage service: %d\n", rc);
280                 m->mds_readpage_service = NULL;
281
282                 GOTO(err_mds_svc, rc);
283         }
284
285         /*
286          * setattr service configuration.
287          *
288          * XXX To keep the compatibility with old client(< 2.2), we need to
289          * preserve this portal for a certain time, it should be removed
290          * eventually. LU-617.
291          */
292         memset(&conf, 0, sizeof(conf));
293         conf = (typeof(conf)) {
294                 .psc_name               = LUSTRE_MDT_NAME "_setattr",
295                 .psc_watchdog_factor    = MDT_SERVICE_WATCHDOG_FACTOR,
296                 .psc_buf                = {
297                         .bc_nbufs               = MDS_NBUFS,
298                         .bc_buf_size            = MDS_BUFSIZE,
299                         .bc_req_max_size        = MDS_MAXREQSIZE,
300                         .bc_rep_max_size        = MDS_LOV_MAXREPSIZE,
301                         .bc_req_portal          = MDS_SETATTR_PORTAL,
302                         .bc_rep_portal          = MDC_REPLY_PORTAL,
303                 },
304                 .psc_thr                = {
305                         .tc_thr_name            = LUSTRE_MDT_NAME "_attr",
306                         .tc_thr_factor          = MDS_SETA_THR_FACTOR,
307                         .tc_nthrs_init          = MDS_SETA_NTHRS_INIT,
308                         .tc_nthrs_base          = MDS_SETA_NTHRS_BASE,
309                         .tc_nthrs_max           = MDS_SETA_NTHRS_MAX,
310                         .tc_nthrs_user          = mds_attr_num_threads,
311                         .tc_cpu_bind            = mds_attr_cpu_bind,
312                         .tc_ctx_tags            = LCT_MD_THREAD,
313                 },
314                 .psc_cpt                = {
315                         .cc_pattern             = mds_attr_num_cpts,
316                         .cc_affinity            = true,
317                 },
318                 .psc_ops                = {
319                         .so_req_handler         = tgt_request_handle,
320                         .so_req_printer         = target_print_req,
321                         .so_hpreq_handler       = NULL,
322                 },
323         };
324         m->mds_setattr_service = ptlrpc_register_service(&conf, &obd->obd_kset,
325                                                          obd->obd_debugfs_entry);
326         if (IS_ERR(m->mds_setattr_service)) {
327                 rc = PTR_ERR(m->mds_setattr_service);
328                 CERROR("failed to start setattr service: %d\n", rc);
329                 m->mds_setattr_service = NULL;
330
331                 GOTO(err_mds_svc, rc);
332         }
333
334         /* Object update service */
335         conf = (typeof(conf)) {
336                 .psc_name               = LUSTRE_MDT_NAME "_out",
337                 .psc_watchdog_factor    = MDT_SERVICE_WATCHDOG_FACTOR,
338                 .psc_buf                = {
339                         .bc_nbufs               = MDS_NBUFS,
340                         .bc_buf_size            = OUT_BUFSIZE,
341                         .bc_req_max_size        = OUT_MAXREQSIZE,
342                         .bc_rep_max_size        = OUT_MAXREPSIZE,
343                         .bc_req_portal          = OUT_PORTAL,
344                         .bc_rep_portal          = OSC_REPLY_PORTAL,
345                 },
346                 /*
347                  * We'd like to have a mechanism to set this on a per-device
348                  * basis, but alas...
349                  */
350                 .psc_thr                = {
351                         .tc_thr_name            = LUSTRE_MDT_NAME "_out",
352                         .tc_thr_factor          = MDS_THR_FACTOR,
353                         .tc_nthrs_init          = MDS_NTHRS_INIT,
354                         .tc_nthrs_base          = MDS_NTHRS_BASE,
355                         .tc_nthrs_max           = MDS_NTHRS_MAX,
356                         .tc_nthrs_user          = mds_num_threads,
357                         .tc_cpu_bind            = mds_cpu_bind,
358                         .tc_ctx_tags            = LCT_MD_THREAD |
359                                                   LCT_DT_THREAD,
360                 },
361                 .psc_cpt                = {
362                         .cc_pattern             = mds_num_cpts,
363                         .cc_affinity            = true,
364                 },
365                 .psc_ops                = {
366                         .so_req_handler         = tgt_request_handle,
367                         .so_req_printer         = target_print_req,
368                         .so_hpreq_handler       = NULL,
369                 },
370         };
371         m->mds_out_service = ptlrpc_register_service(&conf, &obd->obd_kset,
372                                                      obd->obd_debugfs_entry);
373         if (IS_ERR(m->mds_out_service)) {
374                 rc = PTR_ERR(m->mds_out_service);
375                 CERROR("failed to start out service: %d\n", rc);
376                 m->mds_out_service = NULL;
377                 GOTO(err_mds_svc, rc);
378         }
379
380         /*
381          * sequence controller service configuration
382          */
383         memset(&conf, 0, sizeof(conf));
384         conf = (typeof(conf)) {
385                 .psc_name               = LUSTRE_MDT_NAME "_seqs",
386                 .psc_watchdog_factor    = MDT_SERVICE_WATCHDOG_FACTOR,
387                 .psc_buf                = {
388                         .bc_nbufs               = MDS_NBUFS,
389                         .bc_buf_size            = SEQ_BUFSIZE,
390                         .bc_req_max_size        = SEQ_MAXREQSIZE,
391                         .bc_rep_max_size        = SEQ_MAXREPSIZE,
392                         .bc_req_portal          = SEQ_CONTROLLER_PORTAL,
393                         .bc_rep_portal          = MDC_REPLY_PORTAL,
394                 },
395                 .psc_thr                = {
396                         .tc_thr_name            = LUSTRE_MDT_NAME "_seqs",
397                         .tc_nthrs_init          = MDS_OTHR_NTHRS_INIT,
398                         .tc_nthrs_max           = MDS_OTHR_NTHRS_MAX,
399                         .tc_ctx_tags            = LCT_MD_THREAD,
400                 },
401                 .psc_ops                = {
402                         .so_req_handler         = tgt_request_handle,
403                         .so_req_printer         = target_print_req,
404                         .so_hpreq_handler       = NULL,
405                 },
406         };
407         m->mds_mdsc_service = ptlrpc_register_service(&conf, &obd->obd_kset,
408                                                       obd->obd_debugfs_entry);
409         if (IS_ERR(m->mds_mdsc_service)) {
410                 rc = PTR_ERR(m->mds_mdsc_service);
411                 CERROR("failed to start seq controller service: %d\n", rc);
412                 m->mds_mdsc_service = NULL;
413
414                 GOTO(err_mds_svc, rc);
415         }
416
417         /*
418          * metadata sequence server service configuration
419          */
420         memset(&conf, 0, sizeof(conf));
421         conf = (typeof(conf)) {
422                 .psc_name               = LUSTRE_MDT_NAME "_seqm",
423                 .psc_watchdog_factor    = MDT_SERVICE_WATCHDOG_FACTOR,
424                 .psc_buf                = {
425                         .bc_nbufs               = MDS_NBUFS,
426                         .bc_buf_size            = SEQ_BUFSIZE,
427                         .bc_req_max_size        = SEQ_MAXREQSIZE,
428                         .bc_rep_max_size        = SEQ_MAXREPSIZE,
429                         .bc_req_portal          = SEQ_METADATA_PORTAL,
430                         .bc_rep_portal          = MDC_REPLY_PORTAL,
431                 },
432                 .psc_thr                = {
433                         .tc_thr_name            = LUSTRE_MDT_NAME "_seqm",
434                         .tc_nthrs_init          = MDS_OTHR_NTHRS_INIT,
435                         .tc_nthrs_max           = MDS_OTHR_NTHRS_MAX,
436                         .tc_ctx_tags            = LCT_MD_THREAD | LCT_DT_THREAD
437                 },
438                 .psc_ops                = {
439                         .so_req_handler         = tgt_request_handle,
440                         .so_req_printer         = target_print_req,
441                         .so_hpreq_handler       = NULL,
442                 },
443         };
444         m->mds_mdss_service = ptlrpc_register_service(&conf, &obd->obd_kset,
445                                                       obd->obd_debugfs_entry);
446         if (IS_ERR(m->mds_mdss_service)) {
447                 rc = PTR_ERR(m->mds_mdss_service);
448                 CERROR("failed to start metadata seq server service: %d\n", rc);
449                 m->mds_mdss_service = NULL;
450
451                 GOTO(err_mds_svc, rc);
452         }
453
454         /* FLD service start */
455         memset(&conf, 0, sizeof(conf));
456         conf = (typeof(conf)) {
457                 .psc_name            = LUSTRE_MDT_NAME "_fld",
458                 .psc_watchdog_factor = MDT_SERVICE_WATCHDOG_FACTOR,
459                 .psc_buf                = {
460                         .bc_nbufs               = MDS_NBUFS,
461                         .bc_buf_size            = FLD_BUFSIZE,
462                         .bc_req_max_size        = FLD_MAXREQSIZE,
463                         .bc_rep_max_size        = FLD_MAXREPSIZE,
464                         .bc_req_portal          = FLD_REQUEST_PORTAL,
465                         .bc_rep_portal          = MDC_REPLY_PORTAL,
466                 },
467                 .psc_thr                = {
468                         .tc_thr_name            = LUSTRE_MDT_NAME "_fld",
469                         .tc_nthrs_init          = MDS_OTHR_NTHRS_INIT,
470                         .tc_nthrs_max           = MDS_OTHR_NTHRS_MAX,
471                         .tc_ctx_tags            = LCT_DT_THREAD | LCT_MD_THREAD,
472                 },
473                 .psc_ops                = {
474                         .so_req_handler         = tgt_request_handle,
475                         .so_req_printer         = target_print_req,
476                         .so_hpreq_handler       = NULL,
477                 },
478         };
479         m->mds_fld_service = ptlrpc_register_service(&conf, &obd->obd_kset,
480                                                      obd->obd_debugfs_entry);
481         if (IS_ERR(m->mds_fld_service)) {
482                 rc = PTR_ERR(m->mds_fld_service);
483                 CERROR("failed to start fld service: %d\n", rc);
484                 m->mds_fld_service = NULL;
485
486                 GOTO(err_mds_svc, rc);
487         }
488
489
490         mask = cfs_cpt_nodemask(cfs_cpt_table, CFS_CPT_ANY);
491         /* event CPT feature is disabled in libcfs level by set partition
492          * number to 1, we still want to set node affinity for io service */
493         if (cfs_cpt_number(cfs_cpt_table) == 1 && nodes_weight(*mask) > 1) {
494                 int cpt = 0;
495                 int i;
496
497                 mdt_io_cptable = cfs_cpt_table_alloc(nodes_weight(*mask));
498                 for_each_node_mask(i, *mask) {
499                         if (mdt_io_cptable == NULL) {
500                                 CWARN("MDS failed to create CPT table\n");
501                                 break;
502                         }
503
504                         rc = cfs_cpt_set_node(mdt_io_cptable, cpt++, i);
505                         if (!rc) {
506                                 CWARN("MDS Failed to set node %d for"
507                                       "IO CPT table\n", i);
508                                 cfs_cpt_table_free(mdt_io_cptable);
509                                 mdt_io_cptable = NULL;
510                                 break;
511                         }
512                 }
513         }
514
515         memset(&conf, 0, sizeof(conf));
516         conf = (typeof(conf)) {
517                 .psc_name               = LUSTRE_MDT_NAME "_io",
518                 .psc_watchdog_factor    = MDT_SERVICE_WATCHDOG_FACTOR,
519                 .psc_buf                = {
520                         .bc_nbufs               = OST_NBUFS,
521                         .bc_buf_size            = OST_IO_BUFSIZE,
522                         .bc_req_max_size        = OST_IO_MAXREQSIZE,
523                         .bc_rep_max_size        = OST_IO_MAXREPSIZE,
524                         .bc_req_portal          = MDS_IO_PORTAL,
525                         .bc_rep_portal          = MDC_REPLY_PORTAL,
526                 },
527                 .psc_thr                = {
528                         .tc_thr_name            = LUSTRE_MDT_NAME "_io",
529                         .tc_thr_factor          = OSS_THR_FACTOR,
530                         .tc_nthrs_init          = OSS_NTHRS_INIT,
531                         .tc_nthrs_base          = OSS_NTHRS_BASE,
532                         .tc_nthrs_max           = mds_max_io_threads,
533                         .tc_nthrs_user          = mds_num_threads,
534                         .tc_cpu_bind            = mds_io_cpu_bind,
535                         .tc_ctx_tags            = LCT_DT_THREAD | LCT_MD_THREAD,
536                 },
537                 .psc_cpt                = {
538                         .cc_cptable             = mdt_io_cptable,
539                         .cc_pattern             = mdt_io_cptable == NULL ?
540                                                   mds_io_num_cpts : NULL,
541                         .cc_affinity            = true,
542                 },
543                 .psc_ops                = {
544                         .so_thr_init            = tgt_io_thread_init,
545                         .so_thr_done            = tgt_io_thread_done,
546                         .so_req_handler         = tgt_request_handle,
547                         .so_req_printer         = target_print_req,
548                         .so_hpreq_handler       = tgt_hpreq_handler,
549                 },
550         };
551         m->mds_io_service = ptlrpc_register_service(&conf, &obd->obd_kset,
552                                                     obd->obd_debugfs_entry);
553         if (IS_ERR(m->mds_io_service)) {
554                 rc = PTR_ERR(m->mds_io_service);
555                 CERROR("failed to start MDT I/O service: %d\n", rc);
556                 m->mds_io_service = NULL;
557                 GOTO(err_mds_svc, rc);
558         }
559
560         EXIT;
561 err_mds_svc:
562         if (rc)
563                 mds_stop_ptlrpc_service(m);
564
565         return rc;
566 }
567
568 static inline struct mds_device *mds_dev(struct lu_device *d)
569 {
570         return container_of0(d, struct mds_device, mds_md_dev.md_lu_dev);
571 }
572
573 static struct lu_device *mds_device_fini(const struct lu_env *env,
574                                          struct lu_device *d)
575 {
576         struct mds_device *m = mds_dev(d);
577         struct obd_device *obd = d->ld_obd;
578         ENTRY;
579
580         mds_stop_ptlrpc_service(m);
581         lprocfs_obd_cleanup(obd);
582         RETURN(NULL);
583 }
584
585 static struct lu_device *mds_device_free(const struct lu_env *env,
586                                          struct lu_device *d)
587 {
588         struct mds_device *m = mds_dev(d);
589         ENTRY;
590
591         md_device_fini(&m->mds_md_dev);
592         OBD_FREE_PTR(m);
593         RETURN(NULL);
594 }
595
596 static struct lu_device *mds_device_alloc(const struct lu_env *env,
597                                           struct lu_device_type *t,
598                                           struct lustre_cfg *cfg)
599 {
600         struct mds_device        *m;
601         struct obd_device        *obd;
602         struct lu_device          *l;
603         int rc;
604
605         OBD_ALLOC_PTR(m);
606         if (m == NULL)
607                 return ERR_PTR(-ENOMEM);
608
609         md_device_init(&m->mds_md_dev, t);
610         l = &m->mds_md_dev.md_lu_dev;
611
612         obd = class_name2obd(lustre_cfg_string(cfg, 0));
613         LASSERT(obd != NULL);
614
615         l->ld_obd = obd;
616         /* set this lu_device to obd, because error handling need it */
617         obd->obd_lu_dev = l;
618
619         rc = lprocfs_obd_setup(obd, true);
620         if (rc != 0) {
621                 mds_device_free(env, l);
622                 l = ERR_PTR(rc);
623                 return l;
624         }
625
626         mutex_init(&m->mds_health_mutex);
627
628         rc = mds_start_ptlrpc_service(m);
629         if (rc != 0) {
630                 lprocfs_obd_cleanup(obd);
631                 mds_device_free(env, l);
632                 l = ERR_PTR(rc);
633                 return l;
634         }
635         return l;
636 }
637
638 /* type constructor/destructor: mdt_type_init, mdt_type_fini */
639 LU_TYPE_INIT_FINI(mds, &mdt_thread_key);
640
641 static struct lu_device_type_operations mds_device_type_ops = {
642         .ldto_init = mds_type_init,
643         .ldto_fini = mds_type_fini,
644
645         .ldto_start = mds_type_start,
646         .ldto_stop  = mds_type_stop,
647
648         .ldto_device_alloc = mds_device_alloc,
649         .ldto_device_free  = mds_device_free,
650         .ldto_device_fini  = mds_device_fini
651 };
652
653 static struct lu_device_type mds_device_type = {
654         .ldt_tags     = LU_DEVICE_MD,
655         .ldt_name     = LUSTRE_MDS_NAME,
656         .ldt_ops      = &mds_device_type_ops,
657         .ldt_ctx_tags = LCT_MD_THREAD
658 };
659
660 static int mds_health_check(const struct lu_env *env, struct obd_device *obd)
661 {
662         struct mds_device *mds = mds_dev(obd->obd_lu_dev);
663         int rc = 0;
664
665
666         mutex_lock(&mds->mds_health_mutex);
667         rc |= ptlrpc_service_health_check(mds->mds_regular_service);
668         rc |= ptlrpc_service_health_check(mds->mds_readpage_service);
669         rc |= ptlrpc_service_health_check(mds->mds_out_service);
670         rc |= ptlrpc_service_health_check(mds->mds_setattr_service);
671         rc |= ptlrpc_service_health_check(mds->mds_mdsc_service);
672         rc |= ptlrpc_service_health_check(mds->mds_mdss_service);
673         rc |= ptlrpc_service_health_check(mds->mds_fld_service);
674         rc |= ptlrpc_service_health_check(mds->mds_io_service);
675         mutex_unlock(&mds->mds_health_mutex);
676
677         return rc != 0 ? 1 : 0;
678 }
679
680 static const struct obd_ops mds_obd_device_ops = {
681         .o_owner           = THIS_MODULE,
682         .o_health_check    = mds_health_check,
683 };
684
685 int mds_mod_init(void)
686 {
687         return class_register_type(&mds_obd_device_ops, NULL, false, NULL,
688                                    LUSTRE_MDS_NAME, &mds_device_type);
689 }
690
691 void mds_mod_exit(void)
692 {
693         class_unregister_type(LUSTRE_MDS_NAME);
694 }