Whamcloud - gitweb
76fc9741a70a8b541de359c911303343de769499
[fs/lustre-release.git] / lustre / mdt / mdt_mds.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2012 Intel Corporation
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  *
31  * lustre/mdt/mdt_mds.c
32  *
33  * Lustre Metadata Service Layer
34  *
35  * Author: Di Wang <di.wang@whamcloud.com>
36  **/
37
38 #define DEBUG_SUBSYSTEM S_MDS
39
40 #include <linux/module.h>
41
42 #include <obd_support.h>
43 /* struct ptlrpc_request */
44 #include <lustre_net.h>
45 /* struct obd_export */
46 #include <lustre_export.h>
47 /* struct obd_device */
48 #include <obd.h>
49 /* lu2dt_dev() */
50 #include <dt_object.h>
51 #include <lustre_mds.h>
52 #include <lustre_mdt.h>
53 #include "mdt_internal.h"
54 #ifdef HAVE_QUOTA_SUPPORT
55 # include <lustre_quota.h>
56 #endif
57 #include <lustre_acl.h>
58 #include <lustre_param.h>
59 #include <lustre_fsfilt.h>
60
61 struct mds_device {
62         /* super-class */
63         struct md_device           mds_md_dev;
64         struct ptlrpc_service     *mds_regular_service;
65         struct ptlrpc_service     *mds_readpage_service;
66         struct ptlrpc_service     *mds_setattr_service;
67         struct ptlrpc_service     *mds_mdsc_service;
68         struct ptlrpc_service     *mds_mdss_service;
69         struct ptlrpc_service     *mds_fld_service;
70 };
71
72 /*
73  *  * Initialized in mdt_mod_init().
74  *   */
75 static unsigned long mdt_num_threads;
76 CFS_MODULE_PARM(mdt_num_threads, "ul", ulong, 0444,
77                 "number of MDS service threads to start "
78                 "(deprecated in favor of mds_num_threads)");
79
80 static unsigned long mds_num_threads;
81 CFS_MODULE_PARM(mds_num_threads, "ul", ulong, 0444,
82                 "number of MDS service threads to start");
83
84 static char *mds_num_cpts;
85 CFS_MODULE_PARM(mds_num_cpts, "c", charp, 0444,
86                 "CPU partitions MDS threads should run on");
87
88 static unsigned long mds_rdpg_num_threads;
89 CFS_MODULE_PARM(mds_rdpg_num_threads, "ul", ulong, 0444,
90                 "number of MDS readpage service threads to start");
91
92 static char *mds_rdpg_num_cpts;
93 CFS_MODULE_PARM(mds_rdpg_num_cpts, "c", charp, 0444,
94                 "CPU partitions MDS readpage threads should run on");
95
96 /* NB: these two should be removed along with setattr service in the future */
97 static unsigned long mds_attr_num_threads;
98 CFS_MODULE_PARM(mds_attr_num_threads, "ul", ulong, 0444,
99                 "number of MDS setattr service threads to start");
100
101 static char *mds_attr_num_cpts;
102 CFS_MODULE_PARM(mds_attr_num_cpts, "c", charp, 0444,
103                 "CPU partitions MDS setattr threads should run on");
104
105 #define DEFINE_RPC_HANDLER(base, flags, opc, fn, fmt)                   \
106 [opc - base] = {                                                        \
107         .mh_name        = #opc,                                         \
108         .mh_fail_id     = OBD_FAIL_ ## opc ## _NET,                     \
109         .mh_opc         = opc,                                          \
110         .mh_flags       = flags,                                        \
111         .mh_act         = fn,                                           \
112         .mh_fmt         = fmt                                           \
113 }
114
115 /* Request with a format known in advance */
116 #define DEF_MDT_HDL(flags, name, fn)                                    \
117         DEFINE_RPC_HANDLER(MDS_GETATTR, flags, name, fn, &RQF_ ## name)
118
119 /* Request with a format we do not yet know */
120 #define DEF_MDT_HDL_VAR(flags, name, fn)                                \
121         DEFINE_RPC_HANDLER(MDS_GETATTR, flags, name, fn, NULL)
122
123 /* Map one non-standard request format handler.  This should probably get
124  * a common OBD_SET_INFO RPC opcode instead of this mismatch. */
125 #define RQF_MDS_SET_INFO RQF_OBD_SET_INFO
126
127 static struct mdt_handler mdt_mds_ops[] = {
128 DEF_MDT_HDL(0,                          MDS_CONNECT,      mdt_connect),
129 DEF_MDT_HDL(0,                          MDS_DISCONNECT,   mdt_disconnect),
130 DEF_MDT_HDL(0,                          MDS_SET_INFO,     mdt_set_info),
131 DEF_MDT_HDL(0,                          MDS_GET_INFO,     mdt_get_info),
132 DEF_MDT_HDL(0           | HABEO_REFERO, MDS_GETSTATUS,    mdt_getstatus),
133 DEF_MDT_HDL(HABEO_CORPUS,               MDS_GETATTR,      mdt_getattr),
134 DEF_MDT_HDL(HABEO_CORPUS| HABEO_REFERO, MDS_GETATTR_NAME, mdt_getattr_name),
135 DEF_MDT_HDL(HABEO_CORPUS,               MDS_GETXATTR,     mdt_getxattr),
136 DEF_MDT_HDL(0           | HABEO_REFERO, MDS_STATFS,       mdt_statfs),
137 DEF_MDT_HDL(0           | MUTABOR,      MDS_REINT,        mdt_reint),
138 DEF_MDT_HDL(HABEO_CORPUS,               MDS_CLOSE,        mdt_close),
139 DEF_MDT_HDL(HABEO_CORPUS,               MDS_DONE_WRITING, mdt_done_writing),
140 DEF_MDT_HDL(0           | HABEO_REFERO, MDS_PIN,          mdt_pin),
141 DEF_MDT_HDL_VAR(0,                      MDS_SYNC,         mdt_sync),
142 DEF_MDT_HDL(HABEO_CORPUS| HABEO_REFERO, MDS_IS_SUBDIR,    mdt_is_subdir),
143 DEF_MDT_HDL(0,                          MDS_QUOTACHECK,   mdt_quotacheck),
144 DEF_MDT_HDL(0,                          MDS_QUOTACTL,     mdt_quotactl),
145 DEF_MDT_HDL(0           | HABEO_REFERO, MDS_HSM_PROGRESS, mdt_hsm_progress),
146 DEF_MDT_HDL(0           | HABEO_REFERO, MDS_HSM_CT_REGISTER,
147                                                 mdt_hsm_ct_register),
148 DEF_MDT_HDL(0           | HABEO_REFERO, MDS_HSM_CT_UNREGISTER,
149                                                 mdt_hsm_ct_unregister),
150 };
151
152 #define DEF_OBD_HDL(flags, name, fn)                                    \
153         DEFINE_RPC_HANDLER(OBD_PING, flags, name, fn, NULL)
154
155 static struct mdt_handler mdt_obd_ops[] = {
156 DEF_OBD_HDL(0,                          OBD_PING,         mdt_obd_ping),
157 DEF_OBD_HDL(0,                          OBD_LOG_CANCEL,   mdt_obd_log_cancel),
158 DEF_OBD_HDL(0,                          OBD_QC_CALLBACK,  mdt_obd_qc_callback),
159 DEF_OBD_HDL(0,                          OBD_IDX_READ,     mdt_obd_idx_read)
160 };
161
162 #define DEF_DLM_HDL_VAR(flags, name, fn)                                \
163         DEFINE_RPC_HANDLER(LDLM_ENQUEUE, flags, name, fn, NULL)
164 #define DEF_DLM_HDL(flags, name, fn)                                    \
165         DEFINE_RPC_HANDLER(LDLM_ENQUEUE, flags, name, fn, &RQF_ ## name)
166
167 static struct mdt_handler mdt_dlm_ops[] = {
168 DEF_DLM_HDL    (HABEO_CLAVIS,           LDLM_ENQUEUE,     mdt_enqueue),
169 DEF_DLM_HDL_VAR(HABEO_CLAVIS,           LDLM_CONVERT,     mdt_convert),
170 DEF_DLM_HDL_VAR(0,                      LDLM_BL_CALLBACK, mdt_bl_callback),
171 DEF_DLM_HDL_VAR(0,                      LDLM_CP_CALLBACK, mdt_cp_callback)
172 };
173
174 #define DEF_LLOG_HDL(flags, name, fn)                                   \
175         DEFINE_RPC_HANDLER(LLOG_ORIGIN_HANDLE_CREATE, flags, name, fn, NULL)
176
177 static struct mdt_handler mdt_llog_ops[] = {
178 DEF_LLOG_HDL(0,         LLOG_ORIGIN_HANDLE_CREATE,        mdt_llog_create),
179 DEF_LLOG_HDL(0,         LLOG_ORIGIN_HANDLE_NEXT_BLOCK,    mdt_llog_next_block),
180 DEF_LLOG_HDL(0,         LLOG_ORIGIN_HANDLE_READ_HEADER,   mdt_llog_read_header),
181 DEF_LLOG_HDL(0,         LLOG_ORIGIN_HANDLE_WRITE_REC,     NULL),
182 DEF_LLOG_HDL(0,         LLOG_ORIGIN_HANDLE_CLOSE,         NULL),
183 DEF_LLOG_HDL(0,         LLOG_ORIGIN_CONNECT,              NULL),
184 DEF_LLOG_HDL(0,         LLOG_CATINFO,                     NULL),
185 DEF_LLOG_HDL(0,         LLOG_ORIGIN_HANDLE_PREV_BLOCK,    mdt_llog_prev_block),
186 DEF_LLOG_HDL(0,         LLOG_ORIGIN_HANDLE_DESTROY,       mdt_llog_destroy),
187 };
188
189 #define DEF_SEC_HDL(flags, name, fn)                                    \
190         DEFINE_RPC_HANDLER(SEC_CTX_INIT, flags, name, fn, NULL)
191
192 static struct mdt_handler mdt_sec_ctx_ops[] = {
193 DEF_SEC_HDL(0,                          SEC_CTX_INIT,     mdt_sec_ctx_handle),
194 DEF_SEC_HDL(0,                          SEC_CTX_INIT_CONT,mdt_sec_ctx_handle),
195 DEF_SEC_HDL(0,                          SEC_CTX_FINI,     mdt_sec_ctx_handle)
196 };
197
198 #define DEF_QUOTA_HDL(flags, name, fn)                          \
199         DEFINE_RPC_HANDLER(QUOTA_DQACQ, flags, name, fn, &RQF_ ## name)
200
201 static struct mdt_handler mdt_quota_ops[] = {
202 DEF_QUOTA_HDL(HABEO_REFERO,             QUOTA_DQACQ,      mdt_quota_dqacq),
203 };
204
205 struct mdt_opc_slice mdt_regular_handlers[] = {
206         {
207                 .mos_opc_start  = MDS_GETATTR,
208                 .mos_opc_end    = MDS_LAST_OPC,
209                 .mos_hs         = mdt_mds_ops
210         },
211         {
212                 .mos_opc_start  = OBD_PING,
213                 .mos_opc_end    = OBD_LAST_OPC,
214                 .mos_hs         = mdt_obd_ops
215         },
216         {
217                 .mos_opc_start  = LDLM_ENQUEUE,
218                 .mos_opc_end    = LDLM_LAST_OPC,
219                 .mos_hs         = mdt_dlm_ops
220         },
221         {
222                 .mos_opc_start  = LLOG_ORIGIN_HANDLE_CREATE,
223                 .mos_opc_end    = LLOG_LAST_OPC,
224                 .mos_hs         = mdt_llog_ops
225         },
226         {
227                 .mos_opc_start  = SEC_CTX_INIT,
228                 .mos_opc_end    = SEC_LAST_OPC,
229                 .mos_hs         = mdt_sec_ctx_ops
230         },
231         {
232                 .mos_opc_start  = QUOTA_DQACQ,
233                 .mos_opc_end    = QUOTA_LAST_OPC,
234                 .mos_hs         = mdt_quota_ops
235         },
236         {
237                 .mos_hs         = NULL
238         }
239 };
240
241 /* Readpage/readdir handlers */
242 static struct mdt_handler mdt_readpage_ops[] = {
243 DEF_MDT_HDL(0,                  MDS_CONNECT,  mdt_connect),
244 DEF_MDT_HDL(HABEO_CORPUS | HABEO_REFERO, MDS_READPAGE, mdt_readpage),
245 /* XXX: this is ugly and should be fixed one day, see mdc_close() for
246  * detailed comments. --umka */
247 DEF_MDT_HDL(HABEO_CORPUS,               MDS_CLOSE,        mdt_close),
248 DEF_MDT_HDL(HABEO_CORPUS,               MDS_DONE_WRITING, mdt_done_writing),
249 };
250
251 static struct mdt_opc_slice mdt_readpage_handlers[] = {
252         {
253                 .mos_opc_start = MDS_GETATTR,
254                 .mos_opc_end   = MDS_LAST_OPC,
255                 .mos_hs = mdt_readpage_ops
256         },
257         {
258                 .mos_opc_start = OBD_FIRST_OPC,
259                 .mos_opc_end   = OBD_LAST_OPC,
260                 .mos_hs = mdt_obd_ops
261         },
262         {
263                 .mos_hs = NULL
264         }
265 };
266
267 /* Sequence service handlers */
268 #define DEF_SEQ_HDL(flags, name, fn)                                    \
269         DEFINE_RPC_HANDLER(SEQ_QUERY, flags, name, fn, &RQF_ ## name)
270
271 static struct mdt_handler mdt_seq_ops[] = {
272 DEF_SEQ_HDL(0,                          SEQ_QUERY,        (void *)seq_query),
273 };
274
275 struct mdt_opc_slice mdt_seq_handlers[] = {
276         {
277                 .mos_opc_start = SEQ_QUERY,
278                 .mos_opc_end   = SEQ_LAST_OPC,
279                 .mos_hs = mdt_seq_ops
280         },
281         {
282                 .mos_hs = NULL
283         }
284 };
285
286 /* FID Location Database handlers */
287 #define DEF_FLD_HDL(flags, name, fn)                                    \
288         DEFINE_RPC_HANDLER(FLD_QUERY, flags, name, fn, &RQF_ ## name)
289
290 static struct mdt_handler mdt_fld_ops[] = {
291 DEF_FLD_HDL(0,                          FLD_QUERY,        (void *)fld_query),
292 };
293
294 struct mdt_opc_slice mdt_fld_handlers[] = {
295         {
296                 .mos_opc_start = FLD_QUERY,
297                 .mos_opc_end   = FLD_LAST_OPC,
298                 .mos_hs = mdt_fld_ops
299         },
300         {
301                 .mos_hs = NULL
302         }
303 };
304
305 static int mds_regular_handle(struct ptlrpc_request *req)
306 {
307         return mdt_handle_common(req, mdt_regular_handlers);
308 }
309
310 static int mds_readpage_handle(struct ptlrpc_request *req)
311 {
312         return mdt_handle_common(req, mdt_readpage_handlers);
313 }
314
315 static int mds_mdsc_handle(struct ptlrpc_request *req)
316 {
317         return mdt_handle_common(req, mdt_seq_handlers);
318 }
319
320 static int mds_mdss_handle(struct ptlrpc_request *req)
321 {
322         return mdt_handle_common(req, mdt_seq_handlers);
323 }
324
325 static int mds_fld_handle(struct ptlrpc_request *req)
326 {
327         return mdt_handle_common(req, mdt_fld_handlers);
328 }
329
330 /* device init/fini methods */
331 static void mds_stop_ptlrpc_service(struct mds_device *m)
332 {
333         ENTRY;
334         if (m->mds_regular_service != NULL) {
335                 ptlrpc_unregister_service(m->mds_regular_service);
336                 m->mds_regular_service = NULL;
337         }
338         if (m->mds_readpage_service != NULL) {
339                 ptlrpc_unregister_service(m->mds_readpage_service);
340                 m->mds_readpage_service = NULL;
341         }
342         if (m->mds_setattr_service != NULL) {
343                 ptlrpc_unregister_service(m->mds_setattr_service);
344                 m->mds_setattr_service = NULL;
345         }
346         if (m->mds_mdsc_service != NULL) {
347                 ptlrpc_unregister_service(m->mds_mdsc_service);
348                 m->mds_mdsc_service = NULL;
349         }
350         if (m->mds_mdss_service != NULL) {
351                 ptlrpc_unregister_service(m->mds_mdss_service);
352                 m->mds_mdss_service = NULL;
353         }
354         if (m->mds_fld_service != NULL) {
355                 ptlrpc_unregister_service(m->mds_fld_service);
356                 m->mds_fld_service = NULL;
357         }
358         EXIT;
359 }
360
361 static int mds_start_ptlrpc_service(struct mds_device *m)
362 {
363         static struct ptlrpc_service_conf conf;
364         struct obd_device *obd = m->mds_md_dev.md_lu_dev.ld_obd;
365         cfs_proc_dir_entry_t *procfs_entry;
366         int rc = 0;
367         ENTRY;
368
369         procfs_entry = obd->obd_proc_entry;
370         LASSERT(procfs_entry != NULL);
371
372         conf = (typeof(conf)) {
373                 .psc_name               = LUSTRE_MDT_NAME,
374                 .psc_watchdog_factor    = MDT_SERVICE_WATCHDOG_FACTOR,
375                 .psc_buf                = {
376                         .bc_nbufs               = MDS_NBUFS,
377                         .bc_buf_size            = MDS_BUFSIZE,
378                         .bc_req_max_size        = MDS_MAXREQSIZE,
379                         .bc_rep_max_size        = MDS_MAXREPSIZE,
380                         .bc_req_portal          = MDS_REQUEST_PORTAL,
381                         .bc_rep_portal          = MDC_REPLY_PORTAL,
382                 },
383                 /*
384                  * We'd like to have a mechanism to set this on a per-device
385                  * basis, but alas...
386                  */
387                 .psc_thr                = {
388                         .tc_thr_name            = LUSTRE_MDT_NAME,
389                         .tc_thr_factor          = MDS_THR_FACTOR,
390                         .tc_nthrs_init          = MDS_NTHRS_INIT,
391                         .tc_nthrs_base          = MDS_NTHRS_BASE,
392                         .tc_nthrs_max           = MDS_NTHRS_MAX,
393                         .tc_nthrs_user          = mds_num_threads,
394                         .tc_cpu_affinity        = 1,
395                         .tc_ctx_tags            = LCT_MD_THREAD,
396                 },
397                 .psc_cpt                = {
398                         .cc_pattern             = mds_num_cpts,
399                 },
400                 .psc_ops                = {
401                         .so_req_handler         = mds_regular_handle,
402                         .so_req_printer         = target_print_req,
403                         .so_hpreq_handler       = ptlrpc_hpreq_handler,
404                 },
405         };
406         m->mds_regular_service = ptlrpc_register_service(&conf, procfs_entry);
407         if (IS_ERR(m->mds_regular_service)) {
408                 rc = PTR_ERR(m->mds_regular_service);
409                 CERROR("failed to start regular mdt service: %d\n", rc);
410                 m->mds_regular_service = NULL;
411
412                 RETURN(rc);
413         }
414
415         /*
416          * readpage service configuration. Parameters have to be adjusted,
417          * ideally.
418          */
419         memset(&conf, 0, sizeof(conf));
420         conf = (typeof(conf)) {
421                 .psc_name               = LUSTRE_MDT_NAME "_readpage",
422                 .psc_watchdog_factor    = MDT_SERVICE_WATCHDOG_FACTOR,
423                 .psc_buf                = {
424                         .bc_nbufs               = MDS_NBUFS,
425                         .bc_buf_size            = MDS_BUFSIZE,
426                         .bc_req_max_size        = MDS_MAXREQSIZE,
427                         .bc_rep_max_size        = MDS_MAXREPSIZE,
428                         .bc_req_portal          = MDS_READPAGE_PORTAL,
429                         .bc_rep_portal          = MDC_REPLY_PORTAL,
430                 },
431                 .psc_thr                = {
432                         .tc_thr_name            = LUSTRE_MDT_NAME "_rdpg",
433                         .tc_thr_factor          = MDS_RDPG_THR_FACTOR,
434                         .tc_nthrs_init          = MDS_RDPG_NTHRS_INIT,
435                         .tc_nthrs_base          = MDS_RDPG_NTHRS_BASE,
436                         .tc_nthrs_max           = MDS_RDPG_NTHRS_MAX,
437                         .tc_nthrs_user          = mds_rdpg_num_threads,
438                         .tc_cpu_affinity        = 1,
439                         .tc_ctx_tags            = LCT_MD_THREAD,
440                 },
441                 .psc_cpt                = {
442                         .cc_pattern             = mds_rdpg_num_cpts,
443                 },
444                 .psc_ops                = {
445                         .so_req_handler         = mds_readpage_handle,
446                         .so_req_printer         = target_print_req,
447                 },
448         };
449         m->mds_readpage_service = ptlrpc_register_service(&conf, procfs_entry);
450         if (IS_ERR(m->mds_readpage_service)) {
451                 rc = PTR_ERR(m->mds_readpage_service);
452                 CERROR("failed to start readpage service: %d\n", rc);
453                 m->mds_readpage_service = NULL;
454
455                 GOTO(err_mds_svc, rc);
456         }
457
458         /*
459          * setattr service configuration.
460          *
461          * XXX To keep the compatibility with old client(< 2.2), we need to
462          * preserve this portal for a certain time, it should be removed
463          * eventually. LU-617.
464          */
465         memset(&conf, 0, sizeof(conf));
466         conf = (typeof(conf)) {
467                 .psc_name               = LUSTRE_MDT_NAME "_setattr",
468                 .psc_watchdog_factor    = MDT_SERVICE_WATCHDOG_FACTOR,
469                 .psc_buf                = {
470                         .bc_nbufs               = MDS_NBUFS,
471                         .bc_buf_size            = MDS_BUFSIZE,
472                         .bc_req_max_size        = MDS_MAXREQSIZE,
473                         .bc_rep_max_size        = MDS_MAXREPSIZE,
474                         .bc_req_portal          = MDS_SETATTR_PORTAL,
475                         .bc_rep_portal          = MDC_REPLY_PORTAL,
476                 },
477                 .psc_thr                = {
478                         .tc_thr_name            = LUSTRE_MDT_NAME "_attr",
479                         .tc_thr_factor          = MDS_SETA_THR_FACTOR,
480                         .tc_nthrs_init          = MDS_SETA_NTHRS_INIT,
481                         .tc_nthrs_base          = MDS_SETA_NTHRS_BASE,
482                         .tc_nthrs_max           = MDS_SETA_NTHRS_MAX,
483                         .tc_nthrs_user          = mds_attr_num_threads,
484                         .tc_cpu_affinity        = 1,
485                         .tc_ctx_tags            = LCT_MD_THREAD,
486                 },
487                 .psc_cpt                = {
488                         .cc_pattern             = mds_attr_num_cpts,
489                 },
490                 .psc_ops                = {
491                         .so_req_handler         = mds_regular_handle,
492                         .so_req_printer         = target_print_req,
493                         .so_hpreq_handler       = NULL,
494                 },
495         };
496         m->mds_setattr_service = ptlrpc_register_service(&conf, procfs_entry);
497         if (IS_ERR(m->mds_setattr_service)) {
498                 rc = PTR_ERR(m->mds_setattr_service);
499                 CERROR("failed to start setattr service: %d\n", rc);
500                 m->mds_setattr_service = NULL;
501
502                 GOTO(err_mds_svc, rc);
503         }
504
505         /*
506          * sequence controller service configuration
507          */
508         memset(&conf, 0, sizeof(conf));
509         conf = (typeof(conf)) {
510                 .psc_name               = LUSTRE_MDT_NAME "_seqs",
511                 .psc_watchdog_factor    = MDT_SERVICE_WATCHDOG_FACTOR,
512                 .psc_buf                = {
513                         .bc_nbufs               = MDS_NBUFS,
514                         .bc_buf_size            = MDS_BUFSIZE,
515                         .bc_req_max_size        = SEQ_MAXREQSIZE,
516                         .bc_rep_max_size        = SEQ_MAXREPSIZE,
517                         .bc_req_portal          = SEQ_CONTROLLER_PORTAL,
518                         .bc_rep_portal          = MDC_REPLY_PORTAL,
519                 },
520                 .psc_thr                = {
521                         .tc_thr_name            = LUSTRE_MDT_NAME "_seqs",
522                         .tc_nthrs_init          = MDS_OTHR_NTHRS_INIT,
523                         .tc_nthrs_max           = MDS_OTHR_NTHRS_MAX,
524                         .tc_ctx_tags            = LCT_MD_THREAD,
525                 },
526                 .psc_ops                = {
527                         .so_req_handler         = mds_mdsc_handle,
528                         .so_req_printer         = target_print_req,
529                         .so_hpreq_handler       = NULL,
530                 },
531         };
532         m->mds_mdsc_service = ptlrpc_register_service(&conf, procfs_entry);
533         if (IS_ERR(m->mds_mdsc_service)) {
534                 rc = PTR_ERR(m->mds_mdsc_service);
535                 CERROR("failed to start seq controller service: %d\n", rc);
536                 m->mds_mdsc_service = NULL;
537
538                 GOTO(err_mds_svc, rc);
539         }
540
541         /*
542          * metadata sequence server service configuration
543          */
544         memset(&conf, 0, sizeof(conf));
545         conf = (typeof(conf)) {
546                 .psc_name               = LUSTRE_MDT_NAME "_seqm",
547                 .psc_watchdog_factor    = MDT_SERVICE_WATCHDOG_FACTOR,
548                 .psc_buf                = {
549                         .bc_nbufs               = MDS_NBUFS,
550                         .bc_buf_size            = MDS_BUFSIZE,
551                         .bc_req_max_size        = SEQ_MAXREQSIZE,
552                         .bc_rep_max_size        = SEQ_MAXREPSIZE,
553                         .bc_req_portal          = SEQ_METADATA_PORTAL,
554                         .bc_rep_portal          = MDC_REPLY_PORTAL,
555                 },
556                 .psc_thr                = {
557                         .tc_thr_name            = LUSTRE_MDT_NAME "_seqm",
558                         .tc_nthrs_init          = MDS_OTHR_NTHRS_INIT,
559                         .tc_nthrs_max           = MDS_OTHR_NTHRS_MAX,
560                         .tc_ctx_tags            = LCT_MD_THREAD | LCT_DT_THREAD
561                 },
562                 .psc_ops                = {
563                         .so_req_handler         = mds_mdss_handle,
564                         .so_req_printer         = target_print_req,
565                         .so_hpreq_handler       = NULL,
566                 },
567         };
568         m->mds_mdss_service = ptlrpc_register_service(&conf, procfs_entry);
569         if (IS_ERR(m->mds_mdss_service)) {
570                 rc = PTR_ERR(m->mds_mdss_service);
571                 CERROR("failed to start metadata seq server service: %d\n", rc);
572                 m->mds_mdss_service = NULL;
573
574                 GOTO(err_mds_svc, rc);
575         }
576
577         /* FLD service start */
578         memset(&conf, 0, sizeof(conf));
579         conf = (typeof(conf)) {
580                 .psc_name            = LUSTRE_MDT_NAME "_fld",
581                 .psc_watchdog_factor = MDT_SERVICE_WATCHDOG_FACTOR,
582                 .psc_buf                = {
583                         .bc_nbufs               = MDS_NBUFS,
584                         .bc_buf_size            = MDS_BUFSIZE,
585                         .bc_req_max_size        = FLD_MAXREQSIZE,
586                         .bc_rep_max_size        = FLD_MAXREPSIZE,
587                         .bc_req_portal          = FLD_REQUEST_PORTAL,
588                         .bc_rep_portal          = MDC_REPLY_PORTAL,
589                 },
590                 .psc_thr                = {
591                         .tc_thr_name            = LUSTRE_MDT_NAME "_fld",
592                         .tc_nthrs_init          = MDS_OTHR_NTHRS_INIT,
593                         .tc_nthrs_max           = MDS_OTHR_NTHRS_MAX,
594                         .tc_ctx_tags            = LCT_DT_THREAD | LCT_MD_THREAD
595                 },
596                 .psc_ops                = {
597                         .so_req_handler         = mds_fld_handle,
598                         .so_req_printer         = target_print_req,
599                         .so_hpreq_handler       = NULL,
600                 },
601         };
602         m->mds_fld_service = ptlrpc_register_service(&conf, procfs_entry);
603         if (IS_ERR(m->mds_fld_service)) {
604                 rc = PTR_ERR(m->mds_fld_service);
605                 CERROR("failed to start fld service: %d\n", rc);
606                 m->mds_fld_service = NULL;
607
608                 GOTO(err_mds_svc, rc);
609         }
610
611         EXIT;
612 err_mds_svc:
613         if (rc)
614                 mds_stop_ptlrpc_service(m);
615
616         return rc;
617 }
618
619 static inline struct mds_device *mds_dev(struct lu_device *d)
620 {
621         return container_of0(d, struct mds_device, mds_md_dev.md_lu_dev);
622 }
623
624 static struct lu_device *mds_device_fini(const struct lu_env *env,
625                                          struct lu_device *d)
626 {
627         struct mds_device *m = mds_dev(d);
628         struct obd_device *obd = d->ld_obd;
629         ENTRY;
630
631         mds_stop_ptlrpc_service(m);
632         lprocfs_obd_cleanup(obd);
633         RETURN(NULL);
634 }
635
636 static struct lu_device *mds_device_free(const struct lu_env *env,
637                                          struct lu_device *d)
638 {
639         struct mds_device *m = mds_dev(d);
640         ENTRY;
641
642         md_device_fini(&m->mds_md_dev);
643         OBD_FREE_PTR(m);
644         RETURN(NULL);
645 }
646
647 static struct lu_device *mds_device_alloc(const struct lu_env *env,
648                                           struct lu_device_type *t,
649                                           struct lustre_cfg *cfg)
650 {
651         struct mds_device        *m;
652         struct obd_device        *obd;
653         struct lu_device          *l;
654         int rc;
655
656         OBD_ALLOC_PTR(m);
657         if (m == NULL)
658                 return ERR_PTR(-ENOMEM);
659
660         md_device_init(&m->mds_md_dev, t);
661         l = &m->mds_md_dev.md_lu_dev;
662
663         obd = class_name2obd(lustre_cfg_string(cfg, 0));
664         LASSERT(obd != NULL);
665
666         l->ld_obd = obd;
667         /* set this lu_device to obd, because error handling need it */
668         obd->obd_lu_dev = l;
669
670         rc = lprocfs_obd_setup(obd, lprocfs_mds_obd_vars);
671         if (rc != 0) {
672                 mds_device_free(env, l);
673                 l = ERR_PTR(rc);
674                 return l;
675         }
676
677         rc = mds_start_ptlrpc_service(m);
678
679         if (rc != 0) {
680                 mds_device_free(env, l);
681                 l = ERR_PTR(rc);
682                 return l;
683         }
684
685         return l;
686 }
687
688 /* type constructor/destructor: mdt_type_init, mdt_type_fini */
689 LU_TYPE_INIT_FINI(mds, &mdt_thread_key);
690
691 static struct lu_device_type_operations mds_device_type_ops = {
692         .ldto_init = mds_type_init,
693         .ldto_fini = mds_type_fini,
694
695         .ldto_start = mds_type_start,
696         .ldto_stop  = mds_type_stop,
697
698         .ldto_device_alloc = mds_device_alloc,
699         .ldto_device_free  = mds_device_free,
700         .ldto_device_fini  = mds_device_fini
701 };
702
703 static struct lu_device_type mds_device_type = {
704         .ldt_tags     = LU_DEVICE_MD,
705         .ldt_name     = LUSTRE_MDS_NAME,
706         .ldt_ops      = &mds_device_type_ops,
707         .ldt_ctx_tags = LCT_MD_THREAD
708 };
709
710 static struct obd_ops mds_obd_device_ops = {
711         .o_owner           = THIS_MODULE,
712 };
713
714 int mds_mod_init(void)
715 {
716         int rc;
717
718         if (mdt_num_threads != 0 && mds_num_threads == 0) {
719                 LCONSOLE_INFO("mdt_num_threads module parameter is deprecated, "
720                               "use mds_num_threads instead or unset both for "
721                               "dynamic thread startup\n");
722                 mds_num_threads = mdt_num_threads;
723         }
724
725         rc = class_register_type(&mds_obd_device_ops, NULL,
726                                  lprocfs_mds_module_vars, LUSTRE_MDS_NAME,
727                                  &mds_device_type);
728         return rc;
729 }
730
731 void mds_mod_exit(void)
732 {
733         class_unregister_type(LUSTRE_MDS_NAME);
734 }