4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License version 2 for more details. A copy is
14 * included in the COPYING file that accompanied this code.
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
23 * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2013, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
31 * lustre/mdt/mdt_mds.c
33 * Lustre Metadata Service Layer
35 * Author: Di Wang <di.wang@whamcloud.com>
38 #define DEBUG_SUBSYSTEM S_MDS
40 #include <linux/module.h>
42 #include <obd_support.h>
43 /* struct ptlrpc_request */
44 #include <lustre_net.h>
45 /* struct obd_export */
46 #include <lustre_export.h>
47 /* struct obd_device */
50 #include <dt_object.h>
51 #include <lustre_mds.h>
52 #include <lustre_mdt.h>
53 #include "mdt_internal.h"
54 #include <lustre_quota.h>
55 #include <lustre_acl.h>
56 #include <lustre_param.h>
57 #include <lustre_fsfilt.h>
61 struct md_device mds_md_dev;
62 struct ptlrpc_service *mds_regular_service;
63 struct ptlrpc_service *mds_readpage_service;
64 struct ptlrpc_service *mds_out_service;
65 struct ptlrpc_service *mds_setattr_service;
66 struct ptlrpc_service *mds_mdsc_service;
67 struct ptlrpc_service *mds_mdss_service;
68 struct ptlrpc_service *mds_fld_service;
72 * * Initialized in mdt_mod_init().
74 static unsigned long mdt_num_threads;
75 CFS_MODULE_PARM(mdt_num_threads, "ul", ulong, 0444,
76 "number of MDS service threads to start "
77 "(deprecated in favor of mds_num_threads)");
79 static unsigned long mds_num_threads;
80 CFS_MODULE_PARM(mds_num_threads, "ul", ulong, 0444,
81 "number of MDS service threads to start");
83 static char *mds_num_cpts;
84 CFS_MODULE_PARM(mds_num_cpts, "c", charp, 0444,
85 "CPU partitions MDS threads should run on");
87 static unsigned long mds_rdpg_num_threads;
88 CFS_MODULE_PARM(mds_rdpg_num_threads, "ul", ulong, 0444,
89 "number of MDS readpage service threads to start");
91 static char *mds_rdpg_num_cpts;
92 CFS_MODULE_PARM(mds_rdpg_num_cpts, "c", charp, 0444,
93 "CPU partitions MDS readpage threads should run on");
95 /* NB: these two should be removed along with setattr service in the future */
96 static unsigned long mds_attr_num_threads;
97 CFS_MODULE_PARM(mds_attr_num_threads, "ul", ulong, 0444,
98 "number of MDS setattr service threads to start");
100 static char *mds_attr_num_cpts;
101 CFS_MODULE_PARM(mds_attr_num_cpts, "c", charp, 0444,
102 "CPU partitions MDS setattr threads should run on");
104 #define DEFINE_RPC_HANDLER(base, flags, opc, fn, fmt) \
107 .mh_fail_id = OBD_FAIL_ ## opc ## _NET, \
114 /* Request with a format known in advance */
115 #define DEF_MDT_HDL(flags, name, fn) \
116 DEFINE_RPC_HANDLER(MDS_GETATTR, flags, name, fn, &RQF_ ## name)
118 /* Request with a format we do not yet know */
119 #define DEF_MDT_HDL_VAR(flags, name, fn) \
120 DEFINE_RPC_HANDLER(MDS_GETATTR, flags, name, fn, NULL)
122 /* Map one non-standard request format handler. This should probably get
123 * a common OBD_SET_INFO RPC opcode instead of this mismatch. */
124 #define RQF_MDS_SET_INFO RQF_OBD_SET_INFO
126 static struct mdt_handler mdt_mds_ops[] = {
127 DEF_MDT_HDL(0, MDS_CONNECT, mdt_connect),
128 DEF_MDT_HDL(0, MDS_DISCONNECT, mdt_disconnect),
129 DEF_MDT_HDL(0, MDS_SET_INFO, mdt_set_info),
130 DEF_MDT_HDL(0, MDS_GET_INFO, mdt_get_info),
131 DEF_MDT_HDL(0 | HABEO_REFERO, MDS_GETSTATUS, mdt_getstatus),
132 DEF_MDT_HDL(HABEO_CORPUS, MDS_GETATTR, mdt_getattr),
133 DEF_MDT_HDL(HABEO_CORPUS| HABEO_REFERO, MDS_GETATTR_NAME, mdt_getattr_name),
134 DEF_MDT_HDL(HABEO_CORPUS, MDS_GETXATTR, mdt_getxattr),
135 DEF_MDT_HDL(0 | HABEO_REFERO, MDS_STATFS, mdt_statfs),
136 DEF_MDT_HDL(0 | MUTABOR, MDS_REINT, mdt_reint),
137 DEF_MDT_HDL(HABEO_CORPUS, MDS_CLOSE, mdt_close),
138 DEF_MDT_HDL(HABEO_CORPUS, MDS_DONE_WRITING, mdt_done_writing),
139 DEF_MDT_HDL(0 | HABEO_REFERO, MDS_PIN, mdt_pin),
140 DEF_MDT_HDL_VAR(0, MDS_SYNC, mdt_sync),
141 DEF_MDT_HDL(HABEO_CORPUS| HABEO_REFERO, MDS_IS_SUBDIR, mdt_is_subdir),
142 DEF_MDT_HDL(0, MDS_QUOTACHECK, mdt_quotacheck),
143 DEF_MDT_HDL(0, MDS_QUOTACTL, mdt_quotactl),
144 DEF_MDT_HDL(HABEO_CORPUS| HABEO_REFERO, MDS_HSM_PROGRESS, mdt_hsm_progress),
145 DEF_MDT_HDL(HABEO_CORPUS| HABEO_REFERO, MDS_HSM_CT_REGISTER,
146 mdt_hsm_ct_register),
147 DEF_MDT_HDL(HABEO_CORPUS| HABEO_REFERO, MDS_HSM_CT_UNREGISTER,
148 mdt_hsm_ct_unregister),
149 DEF_MDT_HDL(HABEO_CORPUS| HABEO_REFERO, MDS_HSM_STATE_GET,
151 DEF_MDT_HDL(HABEO_CORPUS| HABEO_REFERO, MDS_HSM_STATE_SET,
153 DEF_MDT_HDL(HABEO_CORPUS| HABEO_REFERO, MDS_HSM_ACTION, mdt_hsm_action),
154 DEF_MDT_HDL(HABEO_CORPUS| HABEO_REFERO, MDS_HSM_REQUEST, mdt_hsm_request),
155 DEF_MDT_HDL(HABEO_CORPUS|HABEO_REFERO, MDS_SWAP_LAYOUTS, mdt_swap_layouts)
158 #define DEF_OBD_HDL(flags, name, fn) \
159 DEFINE_RPC_HANDLER(OBD_PING, flags, name, fn, NULL)
161 static struct mdt_handler mdt_obd_ops[] = {
162 DEF_OBD_HDL(0, OBD_PING, mdt_obd_ping),
163 DEF_OBD_HDL(0, OBD_LOG_CANCEL, mdt_obd_log_cancel),
164 DEF_OBD_HDL(0, OBD_QC_CALLBACK, mdt_obd_qc_callback),
165 DEF_OBD_HDL(0, OBD_IDX_READ, mdt_obd_idx_read)
168 #define DEF_DLM_HDL_VAR(flags, name, fn) \
169 DEFINE_RPC_HANDLER(LDLM_ENQUEUE, flags, name, fn, NULL)
170 #define DEF_DLM_HDL(flags, name, fn) \
171 DEFINE_RPC_HANDLER(LDLM_ENQUEUE, flags, name, fn, &RQF_ ## name)
173 static struct mdt_handler mdt_dlm_ops[] = {
174 DEF_DLM_HDL (HABEO_CLAVIS, LDLM_ENQUEUE, mdt_enqueue),
175 DEF_DLM_HDL_VAR(HABEO_CLAVIS, LDLM_CONVERT, mdt_convert),
176 DEF_DLM_HDL_VAR(0, LDLM_BL_CALLBACK, mdt_bl_callback),
177 DEF_DLM_HDL_VAR(0, LDLM_CP_CALLBACK, mdt_cp_callback)
180 #define DEF_LLOG_HDL(flags, name, fn) \
181 DEFINE_RPC_HANDLER(LLOG_ORIGIN_HANDLE_CREATE, flags, name, fn, NULL)
183 static struct mdt_handler mdt_llog_ops[] = {
184 DEF_LLOG_HDL(0, LLOG_ORIGIN_HANDLE_CREATE, mdt_llog_create),
185 DEF_LLOG_HDL(0, LLOG_ORIGIN_HANDLE_NEXT_BLOCK, mdt_llog_next_block),
186 DEF_LLOG_HDL(0, LLOG_ORIGIN_HANDLE_READ_HEADER, mdt_llog_read_header),
187 DEF_LLOG_HDL(0, LLOG_ORIGIN_HANDLE_WRITE_REC, NULL),
188 DEF_LLOG_HDL(0, LLOG_ORIGIN_HANDLE_CLOSE, NULL),
189 DEF_LLOG_HDL(0, LLOG_ORIGIN_CONNECT, NULL),
190 DEF_LLOG_HDL(0, LLOG_CATINFO, NULL),
191 DEF_LLOG_HDL(0, LLOG_ORIGIN_HANDLE_PREV_BLOCK, mdt_llog_prev_block),
192 DEF_LLOG_HDL(0, LLOG_ORIGIN_HANDLE_DESTROY, mdt_llog_destroy),
195 #define DEF_SEC_HDL(flags, name, fn) \
196 DEFINE_RPC_HANDLER(SEC_CTX_INIT, flags, name, fn, NULL)
198 static struct mdt_handler mdt_sec_ctx_ops[] = {
199 DEF_SEC_HDL(0, SEC_CTX_INIT, mdt_sec_ctx_handle),
200 DEF_SEC_HDL(0, SEC_CTX_INIT_CONT,mdt_sec_ctx_handle),
201 DEF_SEC_HDL(0, SEC_CTX_FINI, mdt_sec_ctx_handle)
204 #define DEF_QUOTA_HDL(flags, name, fn) \
205 DEFINE_RPC_HANDLER(QUOTA_DQACQ, flags, name, fn, &RQF_ ## name)
207 static struct mdt_handler mdt_quota_ops[] = {
208 DEF_QUOTA_HDL(HABEO_REFERO, QUOTA_DQACQ, mdt_quota_dqacq),
211 struct mdt_opc_slice mdt_regular_handlers[] = {
213 .mos_opc_start = MDS_GETATTR,
214 .mos_opc_end = MDS_LAST_OPC,
215 .mos_hs = mdt_mds_ops
218 .mos_opc_start = OBD_PING,
219 .mos_opc_end = OBD_LAST_OPC,
220 .mos_hs = mdt_obd_ops
223 .mos_opc_start = LDLM_ENQUEUE,
224 .mos_opc_end = LDLM_LAST_OPC,
225 .mos_hs = mdt_dlm_ops
228 .mos_opc_start = LLOG_ORIGIN_HANDLE_CREATE,
229 .mos_opc_end = LLOG_LAST_OPC,
230 .mos_hs = mdt_llog_ops
233 .mos_opc_start = SEC_CTX_INIT,
234 .mos_opc_end = SEC_LAST_OPC,
235 .mos_hs = mdt_sec_ctx_ops
238 .mos_opc_start = QUOTA_DQACQ,
239 .mos_opc_end = QUOTA_LAST_OPC,
240 .mos_hs = mdt_quota_ops
247 /* Readpage/readdir handlers */
248 static struct mdt_handler mdt_readpage_ops[] = {
249 DEF_MDT_HDL(0, MDS_CONNECT, mdt_connect),
250 DEF_MDT_HDL(HABEO_CORPUS | HABEO_REFERO, MDS_READPAGE, mdt_readpage),
251 /* XXX: this is ugly and should be fixed one day, see mdc_close() for
252 * detailed comments. --umka */
253 DEF_MDT_HDL(HABEO_CORPUS, MDS_CLOSE, mdt_close),
254 DEF_MDT_HDL(HABEO_CORPUS, MDS_DONE_WRITING, mdt_done_writing),
257 static struct mdt_opc_slice mdt_readpage_handlers[] = {
259 .mos_opc_start = MDS_GETATTR,
260 .mos_opc_end = MDS_LAST_OPC,
261 .mos_hs = mdt_readpage_ops
264 .mos_opc_start = OBD_FIRST_OPC,
265 .mos_opc_end = OBD_LAST_OPC,
266 .mos_hs = mdt_obd_ops
273 /* Sequence service handlers */
274 #define DEF_SEQ_HDL(flags, name, fn) \
275 DEFINE_RPC_HANDLER(SEQ_QUERY, flags, name, fn, &RQF_ ## name)
277 static struct mdt_handler mdt_seq_ops[] = {
278 DEF_SEQ_HDL(0, SEQ_QUERY, (void *)seq_query),
281 struct mdt_opc_slice mdt_seq_handlers[] = {
283 .mos_opc_start = SEQ_QUERY,
284 .mos_opc_end = SEQ_LAST_OPC,
285 .mos_hs = mdt_seq_ops
292 /* FID Location Database handlers */
293 #define DEF_FLD_HDL(flags, name, fn) \
294 DEFINE_RPC_HANDLER(FLD_QUERY, flags, name, fn, &RQF_ ## name)
296 static struct mdt_handler mdt_fld_ops[] = {
297 DEF_FLD_HDL(0, FLD_QUERY, (void *)fld_query),
300 struct mdt_opc_slice mdt_fld_handlers[] = {
302 .mos_opc_start = FLD_QUERY,
303 .mos_opc_end = FLD_LAST_OPC,
304 .mos_hs = mdt_fld_ops
311 /* Request with a format known in advance */
312 #define DEF_UPDATE_HDL(flags, name, fn) \
313 DEFINE_RPC_HANDLER(UPDATE_OBJ, flags, name, fn, &RQF_ ## name)
315 #define target_handler mdt_handler
316 static struct target_handler out_ops[] = {
317 DEF_UPDATE_HDL(MUTABOR, UPDATE_OBJ, out_handle),
320 static struct mdt_opc_slice update_handlers[] = {
322 .mos_opc_start = MDS_GETATTR,
323 .mos_opc_end = MDS_LAST_OPC,
324 .mos_hs = mdt_mds_ops
327 .mos_opc_start = OBD_PING,
328 .mos_opc_end = OBD_LAST_OPC,
329 .mos_hs = mdt_obd_ops
332 .mos_opc_start = LDLM_ENQUEUE,
333 .mos_opc_end = LDLM_LAST_OPC,
334 .mos_hs = mdt_dlm_ops
337 .mos_opc_start = SEC_CTX_INIT,
338 .mos_opc_end = SEC_LAST_OPC,
339 .mos_hs = mdt_sec_ctx_ops
342 .mos_opc_start = UPDATE_OBJ,
343 .mos_opc_end = UPDATE_LAST_OPC,
351 static int mds_regular_handle(struct ptlrpc_request *req)
353 return mdt_handle_common(req, mdt_regular_handlers);
356 static int mds_readpage_handle(struct ptlrpc_request *req)
358 return mdt_handle_common(req, mdt_readpage_handlers);
361 static int mds_mdsc_handle(struct ptlrpc_request *req)
363 return mdt_handle_common(req, mdt_seq_handlers);
366 static int mdt_out_handle(struct ptlrpc_request *req)
368 return mdt_handle_common(req, update_handlers);
371 static int mds_mdss_handle(struct ptlrpc_request *req)
373 return mdt_handle_common(req, mdt_seq_handlers);
376 static int mds_fld_handle(struct ptlrpc_request *req)
378 return mdt_handle_common(req, mdt_fld_handlers);
381 /* device init/fini methods */
382 static void mds_stop_ptlrpc_service(struct mds_device *m)
385 if (m->mds_regular_service != NULL) {
386 ptlrpc_unregister_service(m->mds_regular_service);
387 m->mds_regular_service = NULL;
389 if (m->mds_readpage_service != NULL) {
390 ptlrpc_unregister_service(m->mds_readpage_service);
391 m->mds_readpage_service = NULL;
393 if (m->mds_out_service != NULL) {
394 ptlrpc_unregister_service(m->mds_out_service);
395 m->mds_out_service = NULL;
397 if (m->mds_setattr_service != NULL) {
398 ptlrpc_unregister_service(m->mds_setattr_service);
399 m->mds_setattr_service = NULL;
401 if (m->mds_mdsc_service != NULL) {
402 ptlrpc_unregister_service(m->mds_mdsc_service);
403 m->mds_mdsc_service = NULL;
405 if (m->mds_mdss_service != NULL) {
406 ptlrpc_unregister_service(m->mds_mdss_service);
407 m->mds_mdss_service = NULL;
409 if (m->mds_fld_service != NULL) {
410 ptlrpc_unregister_service(m->mds_fld_service);
411 m->mds_fld_service = NULL;
416 static int mds_start_ptlrpc_service(struct mds_device *m)
418 static struct ptlrpc_service_conf conf;
419 struct obd_device *obd = m->mds_md_dev.md_lu_dev.ld_obd;
420 cfs_proc_dir_entry_t *procfs_entry;
424 procfs_entry = obd->obd_proc_entry;
425 LASSERT(procfs_entry != NULL);
427 conf = (typeof(conf)) {
428 .psc_name = LUSTRE_MDT_NAME,
429 .psc_watchdog_factor = MDT_SERVICE_WATCHDOG_FACTOR,
431 .bc_nbufs = MDS_NBUFS,
432 .bc_buf_size = MDS_REG_BUFSIZE,
433 .bc_req_max_size = MDS_REG_MAXREQSIZE,
434 .bc_rep_max_size = MDS_REG_MAXREPSIZE,
435 .bc_req_portal = MDS_REQUEST_PORTAL,
436 .bc_rep_portal = MDC_REPLY_PORTAL,
439 * We'd like to have a mechanism to set this on a per-device
443 .tc_thr_name = LUSTRE_MDT_NAME,
444 .tc_thr_factor = MDS_THR_FACTOR,
445 .tc_nthrs_init = MDS_NTHRS_INIT,
446 .tc_nthrs_base = MDS_NTHRS_BASE,
447 .tc_nthrs_max = MDS_NTHRS_MAX,
448 .tc_nthrs_user = mds_num_threads,
449 .tc_cpu_affinity = 1,
450 .tc_ctx_tags = LCT_MD_THREAD,
453 .cc_pattern = mds_num_cpts,
456 .so_req_handler = mds_regular_handle,
457 .so_req_printer = target_print_req,
458 .so_hpreq_handler = ptlrpc_hpreq_handler,
461 m->mds_regular_service = ptlrpc_register_service(&conf, procfs_entry);
462 if (IS_ERR(m->mds_regular_service)) {
463 rc = PTR_ERR(m->mds_regular_service);
464 CERROR("failed to start regular mdt service: %d\n", rc);
465 m->mds_regular_service = NULL;
471 * readpage service configuration. Parameters have to be adjusted,
474 memset(&conf, 0, sizeof(conf));
475 conf = (typeof(conf)) {
476 .psc_name = LUSTRE_MDT_NAME "_readpage",
477 .psc_watchdog_factor = MDT_SERVICE_WATCHDOG_FACTOR,
479 .bc_nbufs = MDS_NBUFS,
480 .bc_buf_size = MDS_BUFSIZE,
481 .bc_req_max_size = MDS_MAXREQSIZE,
482 .bc_rep_max_size = MDS_MAXREPSIZE,
483 .bc_req_portal = MDS_READPAGE_PORTAL,
484 .bc_rep_portal = MDC_REPLY_PORTAL,
487 .tc_thr_name = LUSTRE_MDT_NAME "_rdpg",
488 .tc_thr_factor = MDS_RDPG_THR_FACTOR,
489 .tc_nthrs_init = MDS_RDPG_NTHRS_INIT,
490 .tc_nthrs_base = MDS_RDPG_NTHRS_BASE,
491 .tc_nthrs_max = MDS_RDPG_NTHRS_MAX,
492 .tc_nthrs_user = mds_rdpg_num_threads,
493 .tc_cpu_affinity = 1,
494 .tc_ctx_tags = LCT_MD_THREAD,
497 .cc_pattern = mds_rdpg_num_cpts,
500 .so_req_handler = mds_readpage_handle,
501 .so_req_printer = target_print_req,
504 m->mds_readpage_service = ptlrpc_register_service(&conf, procfs_entry);
505 if (IS_ERR(m->mds_readpage_service)) {
506 rc = PTR_ERR(m->mds_readpage_service);
507 CERROR("failed to start readpage service: %d\n", rc);
508 m->mds_readpage_service = NULL;
510 GOTO(err_mds_svc, rc);
514 * setattr service configuration.
516 * XXX To keep the compatibility with old client(< 2.2), we need to
517 * preserve this portal for a certain time, it should be removed
518 * eventually. LU-617.
520 memset(&conf, 0, sizeof(conf));
521 conf = (typeof(conf)) {
522 .psc_name = LUSTRE_MDT_NAME "_setattr",
523 .psc_watchdog_factor = MDT_SERVICE_WATCHDOG_FACTOR,
525 .bc_nbufs = MDS_NBUFS,
526 .bc_buf_size = MDS_BUFSIZE,
527 .bc_req_max_size = MDS_MAXREQSIZE,
528 .bc_rep_max_size = MDS_LOV_MAXREPSIZE,
529 .bc_req_portal = MDS_SETATTR_PORTAL,
530 .bc_rep_portal = MDC_REPLY_PORTAL,
533 .tc_thr_name = LUSTRE_MDT_NAME "_attr",
534 .tc_thr_factor = MDS_SETA_THR_FACTOR,
535 .tc_nthrs_init = MDS_SETA_NTHRS_INIT,
536 .tc_nthrs_base = MDS_SETA_NTHRS_BASE,
537 .tc_nthrs_max = MDS_SETA_NTHRS_MAX,
538 .tc_nthrs_user = mds_attr_num_threads,
539 .tc_cpu_affinity = 1,
540 .tc_ctx_tags = LCT_MD_THREAD,
543 .cc_pattern = mds_attr_num_cpts,
546 .so_req_handler = mds_regular_handle,
547 .so_req_printer = target_print_req,
548 .so_hpreq_handler = NULL,
551 m->mds_setattr_service = ptlrpc_register_service(&conf, procfs_entry);
552 if (IS_ERR(m->mds_setattr_service)) {
553 rc = PTR_ERR(m->mds_setattr_service);
554 CERROR("failed to start setattr service: %d\n", rc);
555 m->mds_setattr_service = NULL;
557 GOTO(err_mds_svc, rc);
560 /* Object update service */
561 conf = (typeof(conf)) {
562 .psc_name = LUSTRE_MDT_NAME "_out",
563 .psc_watchdog_factor = MDT_SERVICE_WATCHDOG_FACTOR,
565 .bc_nbufs = MDS_NBUFS,
566 .bc_buf_size = MDS_OUT_BUFSIZE,
567 .bc_req_max_size = MDS_OUT_MAXREQSIZE,
568 .bc_rep_max_size = MDS_OUT_MAXREPSIZE,
569 .bc_req_portal = MDS_MDS_PORTAL,
570 .bc_rep_portal = MDC_REPLY_PORTAL,
573 * We'd like to have a mechanism to set this on a per-device
577 .tc_thr_name = LUSTRE_MDT_NAME "_out",
578 .tc_thr_factor = MDS_THR_FACTOR,
579 .tc_nthrs_init = MDS_NTHRS_INIT,
580 .tc_nthrs_base = MDS_NTHRS_BASE,
581 .tc_nthrs_max = MDS_NTHRS_MAX,
582 .tc_nthrs_user = mds_num_threads,
583 .tc_cpu_affinity = 1,
584 .tc_ctx_tags = LCT_MD_THREAD,
587 .cc_pattern = mds_num_cpts,
590 .so_req_handler = mdt_out_handle,
591 .so_req_printer = target_print_req,
592 .so_hpreq_handler = NULL,
595 m->mds_out_service = ptlrpc_register_service(&conf, procfs_entry);
596 if (IS_ERR(m->mds_out_service)) {
597 rc = PTR_ERR(m->mds_out_service);
598 CERROR("failed to start out service: %d\n", rc);
599 m->mds_out_service = NULL;
600 GOTO(err_mds_svc, rc);
604 * sequence controller service configuration
606 memset(&conf, 0, sizeof(conf));
607 conf = (typeof(conf)) {
608 .psc_name = LUSTRE_MDT_NAME "_seqs",
609 .psc_watchdog_factor = MDT_SERVICE_WATCHDOG_FACTOR,
611 .bc_nbufs = MDS_NBUFS,
612 .bc_buf_size = SEQ_BUFSIZE,
613 .bc_req_max_size = SEQ_MAXREQSIZE,
614 .bc_rep_max_size = SEQ_MAXREPSIZE,
615 .bc_req_portal = SEQ_CONTROLLER_PORTAL,
616 .bc_rep_portal = MDC_REPLY_PORTAL,
619 .tc_thr_name = LUSTRE_MDT_NAME "_seqs",
620 .tc_nthrs_init = MDS_OTHR_NTHRS_INIT,
621 .tc_nthrs_max = MDS_OTHR_NTHRS_MAX,
622 .tc_ctx_tags = LCT_MD_THREAD,
625 .so_req_handler = mds_mdsc_handle,
626 .so_req_printer = target_print_req,
627 .so_hpreq_handler = NULL,
630 m->mds_mdsc_service = ptlrpc_register_service(&conf, procfs_entry);
631 if (IS_ERR(m->mds_mdsc_service)) {
632 rc = PTR_ERR(m->mds_mdsc_service);
633 CERROR("failed to start seq controller service: %d\n", rc);
634 m->mds_mdsc_service = NULL;
636 GOTO(err_mds_svc, rc);
640 * metadata sequence server service configuration
642 memset(&conf, 0, sizeof(conf));
643 conf = (typeof(conf)) {
644 .psc_name = LUSTRE_MDT_NAME "_seqm",
645 .psc_watchdog_factor = MDT_SERVICE_WATCHDOG_FACTOR,
647 .bc_nbufs = MDS_NBUFS,
648 .bc_buf_size = SEQ_BUFSIZE,
649 .bc_req_max_size = SEQ_MAXREQSIZE,
650 .bc_rep_max_size = SEQ_MAXREPSIZE,
651 .bc_req_portal = SEQ_METADATA_PORTAL,
652 .bc_rep_portal = MDC_REPLY_PORTAL,
655 .tc_thr_name = LUSTRE_MDT_NAME "_seqm",
656 .tc_nthrs_init = MDS_OTHR_NTHRS_INIT,
657 .tc_nthrs_max = MDS_OTHR_NTHRS_MAX,
658 .tc_ctx_tags = LCT_MD_THREAD | LCT_DT_THREAD
661 .so_req_handler = mds_mdss_handle,
662 .so_req_printer = target_print_req,
663 .so_hpreq_handler = NULL,
666 m->mds_mdss_service = ptlrpc_register_service(&conf, procfs_entry);
667 if (IS_ERR(m->mds_mdss_service)) {
668 rc = PTR_ERR(m->mds_mdss_service);
669 CERROR("failed to start metadata seq server service: %d\n", rc);
670 m->mds_mdss_service = NULL;
672 GOTO(err_mds_svc, rc);
675 /* FLD service start */
676 memset(&conf, 0, sizeof(conf));
677 conf = (typeof(conf)) {
678 .psc_name = LUSTRE_MDT_NAME "_fld",
679 .psc_watchdog_factor = MDT_SERVICE_WATCHDOG_FACTOR,
681 .bc_nbufs = MDS_NBUFS,
682 .bc_buf_size = FLD_BUFSIZE,
683 .bc_req_max_size = FLD_MAXREQSIZE,
684 .bc_rep_max_size = FLD_MAXREPSIZE,
685 .bc_req_portal = FLD_REQUEST_PORTAL,
686 .bc_rep_portal = MDC_REPLY_PORTAL,
689 .tc_thr_name = LUSTRE_MDT_NAME "_fld",
690 .tc_nthrs_init = MDS_OTHR_NTHRS_INIT,
691 .tc_nthrs_max = MDS_OTHR_NTHRS_MAX,
692 .tc_ctx_tags = LCT_DT_THREAD | LCT_MD_THREAD
695 .so_req_handler = mds_fld_handle,
696 .so_req_printer = target_print_req,
697 .so_hpreq_handler = NULL,
700 m->mds_fld_service = ptlrpc_register_service(&conf, procfs_entry);
701 if (IS_ERR(m->mds_fld_service)) {
702 rc = PTR_ERR(m->mds_fld_service);
703 CERROR("failed to start fld service: %d\n", rc);
704 m->mds_fld_service = NULL;
706 GOTO(err_mds_svc, rc);
712 mds_stop_ptlrpc_service(m);
717 static inline struct mds_device *mds_dev(struct lu_device *d)
719 return container_of0(d, struct mds_device, mds_md_dev.md_lu_dev);
722 static struct lu_device *mds_device_fini(const struct lu_env *env,
725 struct mds_device *m = mds_dev(d);
726 struct obd_device *obd = d->ld_obd;
729 mds_stop_ptlrpc_service(m);
730 lprocfs_obd_cleanup(obd);
734 static struct lu_device *mds_device_free(const struct lu_env *env,
737 struct mds_device *m = mds_dev(d);
740 md_device_fini(&m->mds_md_dev);
745 static struct lu_device *mds_device_alloc(const struct lu_env *env,
746 struct lu_device_type *t,
747 struct lustre_cfg *cfg)
749 struct mds_device *m;
750 struct obd_device *obd;
756 return ERR_PTR(-ENOMEM);
758 md_device_init(&m->mds_md_dev, t);
759 l = &m->mds_md_dev.md_lu_dev;
761 obd = class_name2obd(lustre_cfg_string(cfg, 0));
762 LASSERT(obd != NULL);
765 /* set this lu_device to obd, because error handling need it */
768 rc = lprocfs_obd_setup(obd, lprocfs_mds_obd_vars);
770 mds_device_free(env, l);
775 rc = mds_start_ptlrpc_service(m);
778 mds_device_free(env, l);
786 /* type constructor/destructor: mdt_type_init, mdt_type_fini */
787 LU_TYPE_INIT_FINI(mds, &mdt_thread_key);
789 static struct lu_device_type_operations mds_device_type_ops = {
790 .ldto_init = mds_type_init,
791 .ldto_fini = mds_type_fini,
793 .ldto_start = mds_type_start,
794 .ldto_stop = mds_type_stop,
796 .ldto_device_alloc = mds_device_alloc,
797 .ldto_device_free = mds_device_free,
798 .ldto_device_fini = mds_device_fini
801 static struct lu_device_type mds_device_type = {
802 .ldt_tags = LU_DEVICE_MD,
803 .ldt_name = LUSTRE_MDS_NAME,
804 .ldt_ops = &mds_device_type_ops,
805 .ldt_ctx_tags = LCT_MD_THREAD
808 static struct obd_ops mds_obd_device_ops = {
809 .o_owner = THIS_MODULE,
812 int mds_mod_init(void)
816 if (mdt_num_threads != 0 && mds_num_threads == 0) {
817 LCONSOLE_INFO("mdt_num_threads module parameter is deprecated, "
818 "use mds_num_threads instead or unset both for "
819 "dynamic thread startup\n");
820 mds_num_threads = mdt_num_threads;
823 rc = class_register_type(&mds_obd_device_ops, NULL,
824 lprocfs_mds_module_vars, LUSTRE_MDS_NAME,
829 void mds_mod_exit(void)
831 class_unregister_type(LUSTRE_MDS_NAME);