Whamcloud - gitweb
LU-2424 ptlrpc: buffer utilization of rqbd
[fs/lustre-release.git] / lustre / mdt / mdt_mds.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2012 Intel Corporation
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  *
31  * lustre/mdt/mdt_mds.c
32  *
33  * Lustre Metadata Service Layer
34  *
35  * Author: Di Wang <di.wang@whamcloud.com>
36  **/
37
38 #define DEBUG_SUBSYSTEM S_MDS
39
40 #include <linux/module.h>
41
42 #include <obd_support.h>
43 /* struct ptlrpc_request */
44 #include <lustre_net.h>
45 /* struct obd_export */
46 #include <lustre_export.h>
47 /* struct obd_device */
48 #include <obd.h>
49 /* lu2dt_dev() */
50 #include <dt_object.h>
51 #include <lustre_mds.h>
52 #include <lustre_mdt.h>
53 #include "mdt_internal.h"
54 #include <lustre_quota.h>
55 #include <lustre_acl.h>
56 #include <lustre_param.h>
57 #include <lustre_fsfilt.h>
58
59 struct mds_device {
60         /* super-class */
61         struct md_device           mds_md_dev;
62         struct ptlrpc_service     *mds_regular_service;
63         struct ptlrpc_service     *mds_readpage_service;
64         struct ptlrpc_service     *mds_out_service;
65         struct ptlrpc_service     *mds_setattr_service;
66         struct ptlrpc_service     *mds_mdsc_service;
67         struct ptlrpc_service     *mds_mdss_service;
68         struct ptlrpc_service     *mds_fld_service;
69 };
70
71 /*
72  *  * Initialized in mdt_mod_init().
73  *   */
74 static unsigned long mdt_num_threads;
75 CFS_MODULE_PARM(mdt_num_threads, "ul", ulong, 0444,
76                 "number of MDS service threads to start "
77                 "(deprecated in favor of mds_num_threads)");
78
79 static unsigned long mds_num_threads;
80 CFS_MODULE_PARM(mds_num_threads, "ul", ulong, 0444,
81                 "number of MDS service threads to start");
82
83 static char *mds_num_cpts;
84 CFS_MODULE_PARM(mds_num_cpts, "c", charp, 0444,
85                 "CPU partitions MDS threads should run on");
86
87 static unsigned long mds_rdpg_num_threads;
88 CFS_MODULE_PARM(mds_rdpg_num_threads, "ul", ulong, 0444,
89                 "number of MDS readpage service threads to start");
90
91 static char *mds_rdpg_num_cpts;
92 CFS_MODULE_PARM(mds_rdpg_num_cpts, "c", charp, 0444,
93                 "CPU partitions MDS readpage threads should run on");
94
95 /* NB: these two should be removed along with setattr service in the future */
96 static unsigned long mds_attr_num_threads;
97 CFS_MODULE_PARM(mds_attr_num_threads, "ul", ulong, 0444,
98                 "number of MDS setattr service threads to start");
99
100 static char *mds_attr_num_cpts;
101 CFS_MODULE_PARM(mds_attr_num_cpts, "c", charp, 0444,
102                 "CPU partitions MDS setattr threads should run on");
103
104 #define DEFINE_RPC_HANDLER(base, flags, opc, fn, fmt)                   \
105 [opc - base] = {                                                        \
106         .mh_name        = #opc,                                         \
107         .mh_fail_id     = OBD_FAIL_ ## opc ## _NET,                     \
108         .mh_opc         = opc,                                          \
109         .mh_flags       = flags,                                        \
110         .mh_act         = fn,                                           \
111         .mh_fmt         = fmt                                           \
112 }
113
114 /* Request with a format known in advance */
115 #define DEF_MDT_HDL(flags, name, fn)                                    \
116         DEFINE_RPC_HANDLER(MDS_GETATTR, flags, name, fn, &RQF_ ## name)
117
118 /* Request with a format we do not yet know */
119 #define DEF_MDT_HDL_VAR(flags, name, fn)                                \
120         DEFINE_RPC_HANDLER(MDS_GETATTR, flags, name, fn, NULL)
121
122 /* Map one non-standard request format handler.  This should probably get
123  * a common OBD_SET_INFO RPC opcode instead of this mismatch. */
124 #define RQF_MDS_SET_INFO RQF_OBD_SET_INFO
125
126 static struct mdt_handler mdt_mds_ops[] = {
127 DEF_MDT_HDL(0,                          MDS_CONNECT,      mdt_connect),
128 DEF_MDT_HDL(0,                          MDS_DISCONNECT,   mdt_disconnect),
129 DEF_MDT_HDL(0,                          MDS_SET_INFO,     mdt_set_info),
130 DEF_MDT_HDL(0,                          MDS_GET_INFO,     mdt_get_info),
131 DEF_MDT_HDL(0           | HABEO_REFERO, MDS_GETSTATUS,    mdt_getstatus),
132 DEF_MDT_HDL(HABEO_CORPUS,               MDS_GETATTR,      mdt_getattr),
133 DEF_MDT_HDL(HABEO_CORPUS| HABEO_REFERO, MDS_GETATTR_NAME, mdt_getattr_name),
134 DEF_MDT_HDL(HABEO_CORPUS,               MDS_GETXATTR,     mdt_getxattr),
135 DEF_MDT_HDL(0           | HABEO_REFERO, MDS_STATFS,       mdt_statfs),
136 DEF_MDT_HDL(0           | MUTABOR,      MDS_REINT,        mdt_reint),
137 DEF_MDT_HDL(HABEO_CORPUS,               MDS_CLOSE,        mdt_close),
138 DEF_MDT_HDL(HABEO_CORPUS,               MDS_DONE_WRITING, mdt_done_writing),
139 DEF_MDT_HDL(0           | HABEO_REFERO, MDS_PIN,          mdt_pin),
140 DEF_MDT_HDL_VAR(0,                      MDS_SYNC,         mdt_sync),
141 DEF_MDT_HDL(HABEO_CORPUS| HABEO_REFERO, MDS_IS_SUBDIR,    mdt_is_subdir),
142 DEF_MDT_HDL(0,                          MDS_QUOTACHECK,   mdt_quotacheck),
143 DEF_MDT_HDL(0,                          MDS_QUOTACTL,     mdt_quotactl),
144 DEF_MDT_HDL(0           | HABEO_REFERO, MDS_HSM_PROGRESS, mdt_hsm_progress),
145 DEF_MDT_HDL(0           | HABEO_REFERO, MDS_HSM_CT_REGISTER,
146                                                 mdt_hsm_ct_register),
147 DEF_MDT_HDL(0           | HABEO_REFERO, MDS_HSM_CT_UNREGISTER,
148                                                 mdt_hsm_ct_unregister),
149 DEF_MDT_HDL(HABEO_CORPUS| HABEO_REFERO, MDS_HSM_STATE_GET,
150                                                 mdt_hsm_state_get),
151 DEF_MDT_HDL(HABEO_CORPUS| HABEO_REFERO, MDS_HSM_STATE_SET,
152                                                 mdt_hsm_state_set),
153 DEF_MDT_HDL(HABEO_CORPUS| HABEO_REFERO, MDS_HSM_ACTION, mdt_hsm_action),
154 DEF_MDT_HDL(0           | HABEO_REFERO, MDS_HSM_REQUEST, mdt_hsm_request),
155 DEF_MDT_HDL(HABEO_CORPUS|HABEO_REFERO,  MDS_SWAP_LAYOUTS, mdt_swap_layouts)
156 };
157
158 #define DEF_OBD_HDL(flags, name, fn)                                    \
159         DEFINE_RPC_HANDLER(OBD_PING, flags, name, fn, NULL)
160
161 static struct mdt_handler mdt_obd_ops[] = {
162 DEF_OBD_HDL(0,                          OBD_PING,         mdt_obd_ping),
163 DEF_OBD_HDL(0,                          OBD_LOG_CANCEL,   mdt_obd_log_cancel),
164 DEF_OBD_HDL(0,                          OBD_QC_CALLBACK,  mdt_obd_qc_callback),
165 DEF_OBD_HDL(0,                          OBD_IDX_READ,     mdt_obd_idx_read)
166 };
167
168 #define DEF_DLM_HDL_VAR(flags, name, fn)                                \
169         DEFINE_RPC_HANDLER(LDLM_ENQUEUE, flags, name, fn, NULL)
170 #define DEF_DLM_HDL(flags, name, fn)                                    \
171         DEFINE_RPC_HANDLER(LDLM_ENQUEUE, flags, name, fn, &RQF_ ## name)
172
173 static struct mdt_handler mdt_dlm_ops[] = {
174 DEF_DLM_HDL    (HABEO_CLAVIS,           LDLM_ENQUEUE,     mdt_enqueue),
175 DEF_DLM_HDL_VAR(HABEO_CLAVIS,           LDLM_CONVERT,     mdt_convert),
176 DEF_DLM_HDL_VAR(0,                      LDLM_BL_CALLBACK, mdt_bl_callback),
177 DEF_DLM_HDL_VAR(0,                      LDLM_CP_CALLBACK, mdt_cp_callback)
178 };
179
180 #define DEF_LLOG_HDL(flags, name, fn)                                   \
181         DEFINE_RPC_HANDLER(LLOG_ORIGIN_HANDLE_CREATE, flags, name, fn, NULL)
182
183 static struct mdt_handler mdt_llog_ops[] = {
184 DEF_LLOG_HDL(0,         LLOG_ORIGIN_HANDLE_CREATE,        mdt_llog_create),
185 DEF_LLOG_HDL(0,         LLOG_ORIGIN_HANDLE_NEXT_BLOCK,    mdt_llog_next_block),
186 DEF_LLOG_HDL(0,         LLOG_ORIGIN_HANDLE_READ_HEADER,   mdt_llog_read_header),
187 DEF_LLOG_HDL(0,         LLOG_ORIGIN_HANDLE_WRITE_REC,     NULL),
188 DEF_LLOG_HDL(0,         LLOG_ORIGIN_HANDLE_CLOSE,         NULL),
189 DEF_LLOG_HDL(0,         LLOG_ORIGIN_CONNECT,              NULL),
190 DEF_LLOG_HDL(0,         LLOG_CATINFO,                     NULL),
191 DEF_LLOG_HDL(0,         LLOG_ORIGIN_HANDLE_PREV_BLOCK,    mdt_llog_prev_block),
192 DEF_LLOG_HDL(0,         LLOG_ORIGIN_HANDLE_DESTROY,       mdt_llog_destroy),
193 };
194
195 #define DEF_SEC_HDL(flags, name, fn)                                    \
196         DEFINE_RPC_HANDLER(SEC_CTX_INIT, flags, name, fn, NULL)
197
198 static struct mdt_handler mdt_sec_ctx_ops[] = {
199 DEF_SEC_HDL(0,                          SEC_CTX_INIT,     mdt_sec_ctx_handle),
200 DEF_SEC_HDL(0,                          SEC_CTX_INIT_CONT,mdt_sec_ctx_handle),
201 DEF_SEC_HDL(0,                          SEC_CTX_FINI,     mdt_sec_ctx_handle)
202 };
203
204 #define DEF_QUOTA_HDL(flags, name, fn)                          \
205         DEFINE_RPC_HANDLER(QUOTA_DQACQ, flags, name, fn, &RQF_ ## name)
206
207 static struct mdt_handler mdt_quota_ops[] = {
208 DEF_QUOTA_HDL(HABEO_REFERO,             QUOTA_DQACQ,      mdt_quota_dqacq),
209 };
210
211 struct mdt_opc_slice mdt_regular_handlers[] = {
212         {
213                 .mos_opc_start  = MDS_GETATTR,
214                 .mos_opc_end    = MDS_LAST_OPC,
215                 .mos_hs         = mdt_mds_ops
216         },
217         {
218                 .mos_opc_start  = OBD_PING,
219                 .mos_opc_end    = OBD_LAST_OPC,
220                 .mos_hs         = mdt_obd_ops
221         },
222         {
223                 .mos_opc_start  = LDLM_ENQUEUE,
224                 .mos_opc_end    = LDLM_LAST_OPC,
225                 .mos_hs         = mdt_dlm_ops
226         },
227         {
228                 .mos_opc_start  = LLOG_ORIGIN_HANDLE_CREATE,
229                 .mos_opc_end    = LLOG_LAST_OPC,
230                 .mos_hs         = mdt_llog_ops
231         },
232         {
233                 .mos_opc_start  = SEC_CTX_INIT,
234                 .mos_opc_end    = SEC_LAST_OPC,
235                 .mos_hs         = mdt_sec_ctx_ops
236         },
237         {
238                 .mos_opc_start  = QUOTA_DQACQ,
239                 .mos_opc_end    = QUOTA_LAST_OPC,
240                 .mos_hs         = mdt_quota_ops
241         },
242         {
243                 .mos_hs         = NULL
244         }
245 };
246
247 /* Readpage/readdir handlers */
248 static struct mdt_handler mdt_readpage_ops[] = {
249 DEF_MDT_HDL(0,                  MDS_CONNECT,  mdt_connect),
250 DEF_MDT_HDL(HABEO_CORPUS | HABEO_REFERO, MDS_READPAGE, mdt_readpage),
251 /* XXX: this is ugly and should be fixed one day, see mdc_close() for
252  * detailed comments. --umka */
253 DEF_MDT_HDL(HABEO_CORPUS,               MDS_CLOSE,        mdt_close),
254 DEF_MDT_HDL(HABEO_CORPUS,               MDS_DONE_WRITING, mdt_done_writing),
255 };
256
257 static struct mdt_opc_slice mdt_readpage_handlers[] = {
258         {
259                 .mos_opc_start = MDS_GETATTR,
260                 .mos_opc_end   = MDS_LAST_OPC,
261                 .mos_hs = mdt_readpage_ops
262         },
263         {
264                 .mos_opc_start = OBD_FIRST_OPC,
265                 .mos_opc_end   = OBD_LAST_OPC,
266                 .mos_hs = mdt_obd_ops
267         },
268         {
269                 .mos_hs = NULL
270         }
271 };
272
273 /* Sequence service handlers */
274 #define DEF_SEQ_HDL(flags, name, fn)                                    \
275         DEFINE_RPC_HANDLER(SEQ_QUERY, flags, name, fn, &RQF_ ## name)
276
277 static struct mdt_handler mdt_seq_ops[] = {
278 DEF_SEQ_HDL(0,                          SEQ_QUERY,        (void *)seq_query),
279 };
280
281 struct mdt_opc_slice mdt_seq_handlers[] = {
282         {
283                 .mos_opc_start = SEQ_QUERY,
284                 .mos_opc_end   = SEQ_LAST_OPC,
285                 .mos_hs = mdt_seq_ops
286         },
287         {
288                 .mos_hs = NULL
289         }
290 };
291
292 /* FID Location Database handlers */
293 #define DEF_FLD_HDL(flags, name, fn)                                    \
294         DEFINE_RPC_HANDLER(FLD_QUERY, flags, name, fn, &RQF_ ## name)
295
296 static struct mdt_handler mdt_fld_ops[] = {
297 DEF_FLD_HDL(0,                          FLD_QUERY,        (void *)fld_query),
298 };
299
300 struct mdt_opc_slice mdt_fld_handlers[] = {
301         {
302                 .mos_opc_start = FLD_QUERY,
303                 .mos_opc_end   = FLD_LAST_OPC,
304                 .mos_hs = mdt_fld_ops
305         },
306         {
307                 .mos_hs = NULL
308         }
309 };
310
311 /* Request with a format known in advance */
312 #define DEF_UPDATE_HDL(flags, name, fn)                                 \
313         DEFINE_RPC_HANDLER(UPDATE_OBJ, flags, name, fn, &RQF_ ## name)
314
315 #define target_handler mdt_handler
316 static struct target_handler out_ops[] = {
317         DEF_UPDATE_HDL(MUTABOR,         UPDATE_OBJ,     out_handle),
318 };
319
320 static struct mdt_opc_slice update_handlers[] = {
321         {
322                 .mos_opc_start = MDS_GETATTR,
323                 .mos_opc_end   = MDS_LAST_OPC,
324                 .mos_hs        = mdt_mds_ops
325         },
326         {
327                 .mos_opc_start = OBD_PING,
328                 .mos_opc_end   = OBD_LAST_OPC,
329                 .mos_hs        = mdt_obd_ops
330         },
331         {
332                 .mos_opc_start = LDLM_ENQUEUE,
333                 .mos_opc_end   = LDLM_LAST_OPC,
334                 .mos_hs        = mdt_dlm_ops
335         },
336         {
337                 .mos_opc_start = SEC_CTX_INIT,
338                 .mos_opc_end   = SEC_LAST_OPC,
339                 .mos_hs        = mdt_sec_ctx_ops
340         },
341         {
342                 .mos_opc_start = UPDATE_OBJ,
343                 .mos_opc_end   = UPDATE_LAST_OPC,
344                 .mos_hs        = out_ops
345         },
346         {
347                 .mos_hs        = NULL
348         }
349 };
350
351 static int mds_regular_handle(struct ptlrpc_request *req)
352 {
353         return mdt_handle_common(req, mdt_regular_handlers);
354 }
355
356 static int mds_readpage_handle(struct ptlrpc_request *req)
357 {
358         return mdt_handle_common(req, mdt_readpage_handlers);
359 }
360
361 static int mds_mdsc_handle(struct ptlrpc_request *req)
362 {
363         return mdt_handle_common(req, mdt_seq_handlers);
364 }
365
366 static int mdt_out_handle(struct ptlrpc_request *req)
367 {
368         return mdt_handle_common(req, update_handlers);
369 }
370
371 static int mds_mdss_handle(struct ptlrpc_request *req)
372 {
373         return mdt_handle_common(req, mdt_seq_handlers);
374 }
375
376 static int mds_fld_handle(struct ptlrpc_request *req)
377 {
378         return mdt_handle_common(req, mdt_fld_handlers);
379 }
380
381 /* device init/fini methods */
382 static void mds_stop_ptlrpc_service(struct mds_device *m)
383 {
384         ENTRY;
385         if (m->mds_regular_service != NULL) {
386                 ptlrpc_unregister_service(m->mds_regular_service);
387                 m->mds_regular_service = NULL;
388         }
389         if (m->mds_readpage_service != NULL) {
390                 ptlrpc_unregister_service(m->mds_readpage_service);
391                 m->mds_readpage_service = NULL;
392         }
393         if (m->mds_out_service != NULL) {
394                 ptlrpc_unregister_service(m->mds_out_service);
395                 m->mds_out_service = NULL;
396         }
397         if (m->mds_setattr_service != NULL) {
398                 ptlrpc_unregister_service(m->mds_setattr_service);
399                 m->mds_setattr_service = NULL;
400         }
401         if (m->mds_mdsc_service != NULL) {
402                 ptlrpc_unregister_service(m->mds_mdsc_service);
403                 m->mds_mdsc_service = NULL;
404         }
405         if (m->mds_mdss_service != NULL) {
406                 ptlrpc_unregister_service(m->mds_mdss_service);
407                 m->mds_mdss_service = NULL;
408         }
409         if (m->mds_fld_service != NULL) {
410                 ptlrpc_unregister_service(m->mds_fld_service);
411                 m->mds_fld_service = NULL;
412         }
413         EXIT;
414 }
415
416 static int mds_start_ptlrpc_service(struct mds_device *m)
417 {
418         static struct ptlrpc_service_conf conf;
419         struct obd_device *obd = m->mds_md_dev.md_lu_dev.ld_obd;
420         cfs_proc_dir_entry_t *procfs_entry;
421         int rc = 0;
422         ENTRY;
423
424         procfs_entry = obd->obd_proc_entry;
425         LASSERT(procfs_entry != NULL);
426
427         conf = (typeof(conf)) {
428                 .psc_name               = LUSTRE_MDT_NAME,
429                 .psc_watchdog_factor    = MDT_SERVICE_WATCHDOG_FACTOR,
430                 .psc_buf                = {
431                         .bc_nbufs               = MDS_NBUFS,
432                         .bc_buf_size            = MDS_LOV_BUFSIZE,
433                         .bc_req_max_size        = MDS_LOV_MAXREQSIZE,
434                         .bc_rep_max_size        = MDS_LOV_MAXREPSIZE,
435                         .bc_req_portal          = MDS_REQUEST_PORTAL,
436                         .bc_rep_portal          = MDC_REPLY_PORTAL,
437                 },
438                 /*
439                  * We'd like to have a mechanism to set this on a per-device
440                  * basis, but alas...
441                  */
442                 .psc_thr                = {
443                         .tc_thr_name            = LUSTRE_MDT_NAME,
444                         .tc_thr_factor          = MDS_THR_FACTOR,
445                         .tc_nthrs_init          = MDS_NTHRS_INIT,
446                         .tc_nthrs_base          = MDS_NTHRS_BASE,
447                         .tc_nthrs_max           = MDS_NTHRS_MAX,
448                         .tc_nthrs_user          = mds_num_threads,
449                         .tc_cpu_affinity        = 1,
450                         .tc_ctx_tags            = LCT_MD_THREAD,
451                 },
452                 .psc_cpt                = {
453                         .cc_pattern             = mds_num_cpts,
454                 },
455                 .psc_ops                = {
456                         .so_req_handler         = mds_regular_handle,
457                         .so_req_printer         = target_print_req,
458                         .so_hpreq_handler       = ptlrpc_hpreq_handler,
459                 },
460         };
461         m->mds_regular_service = ptlrpc_register_service(&conf, procfs_entry);
462         if (IS_ERR(m->mds_regular_service)) {
463                 rc = PTR_ERR(m->mds_regular_service);
464                 CERROR("failed to start regular mdt service: %d\n", rc);
465                 m->mds_regular_service = NULL;
466
467                 RETURN(rc);
468         }
469
470         /*
471          * readpage service configuration. Parameters have to be adjusted,
472          * ideally.
473          */
474         memset(&conf, 0, sizeof(conf));
475         conf = (typeof(conf)) {
476                 .psc_name               = LUSTRE_MDT_NAME "_readpage",
477                 .psc_watchdog_factor    = MDT_SERVICE_WATCHDOG_FACTOR,
478                 .psc_buf                = {
479                         .bc_nbufs               = MDS_NBUFS,
480                         .bc_buf_size            = MDS_BUFSIZE,
481                         .bc_req_max_size        = MDS_MAXREQSIZE,
482                         .bc_rep_max_size        = MDS_MAXREPSIZE,
483                         .bc_req_portal          = MDS_READPAGE_PORTAL,
484                         .bc_rep_portal          = MDC_REPLY_PORTAL,
485                 },
486                 .psc_thr                = {
487                         .tc_thr_name            = LUSTRE_MDT_NAME "_rdpg",
488                         .tc_thr_factor          = MDS_RDPG_THR_FACTOR,
489                         .tc_nthrs_init          = MDS_RDPG_NTHRS_INIT,
490                         .tc_nthrs_base          = MDS_RDPG_NTHRS_BASE,
491                         .tc_nthrs_max           = MDS_RDPG_NTHRS_MAX,
492                         .tc_nthrs_user          = mds_rdpg_num_threads,
493                         .tc_cpu_affinity        = 1,
494                         .tc_ctx_tags            = LCT_MD_THREAD,
495                 },
496                 .psc_cpt                = {
497                         .cc_pattern             = mds_rdpg_num_cpts,
498                 },
499                 .psc_ops                = {
500                         .so_req_handler         = mds_readpage_handle,
501                         .so_req_printer         = target_print_req,
502                 },
503         };
504         m->mds_readpage_service = ptlrpc_register_service(&conf, procfs_entry);
505         if (IS_ERR(m->mds_readpage_service)) {
506                 rc = PTR_ERR(m->mds_readpage_service);
507                 CERROR("failed to start readpage service: %d\n", rc);
508                 m->mds_readpage_service = NULL;
509
510                 GOTO(err_mds_svc, rc);
511         }
512
513         /*
514          * setattr service configuration.
515          *
516          * XXX To keep the compatibility with old client(< 2.2), we need to
517          * preserve this portal for a certain time, it should be removed
518          * eventually. LU-617.
519          */
520         memset(&conf, 0, sizeof(conf));
521         conf = (typeof(conf)) {
522                 .psc_name               = LUSTRE_MDT_NAME "_setattr",
523                 .psc_watchdog_factor    = MDT_SERVICE_WATCHDOG_FACTOR,
524                 .psc_buf                = {
525                         .bc_nbufs               = MDS_NBUFS,
526                         .bc_buf_size            = MDS_BUFSIZE,
527                         .bc_req_max_size        = MDS_MAXREQSIZE,
528                         .bc_rep_max_size        = MDS_LOV_MAXREPSIZE,
529                         .bc_req_portal          = MDS_SETATTR_PORTAL,
530                         .bc_rep_portal          = MDC_REPLY_PORTAL,
531                 },
532                 .psc_thr                = {
533                         .tc_thr_name            = LUSTRE_MDT_NAME "_attr",
534                         .tc_thr_factor          = MDS_SETA_THR_FACTOR,
535                         .tc_nthrs_init          = MDS_SETA_NTHRS_INIT,
536                         .tc_nthrs_base          = MDS_SETA_NTHRS_BASE,
537                         .tc_nthrs_max           = MDS_SETA_NTHRS_MAX,
538                         .tc_nthrs_user          = mds_attr_num_threads,
539                         .tc_cpu_affinity        = 1,
540                         .tc_ctx_tags            = LCT_MD_THREAD,
541                 },
542                 .psc_cpt                = {
543                         .cc_pattern             = mds_attr_num_cpts,
544                 },
545                 .psc_ops                = {
546                         .so_req_handler         = mds_regular_handle,
547                         .so_req_printer         = target_print_req,
548                         .so_hpreq_handler       = NULL,
549                 },
550         };
551         m->mds_setattr_service = ptlrpc_register_service(&conf, procfs_entry);
552         if (IS_ERR(m->mds_setattr_service)) {
553                 rc = PTR_ERR(m->mds_setattr_service);
554                 CERROR("failed to start setattr service: %d\n", rc);
555                 m->mds_setattr_service = NULL;
556
557                 GOTO(err_mds_svc, rc);
558         }
559
560         /* Object update service */
561         conf = (typeof(conf)) {
562                 .psc_name               = LUSTRE_MDT_NAME "_out",
563                 .psc_watchdog_factor    = MDT_SERVICE_WATCHDOG_FACTOR,
564                 .psc_buf                = {
565                         .bc_nbufs               = MDS_NBUFS,
566                         .bc_buf_size            = MDS_BUFSIZE,
567                         .bc_req_max_size        = MDS_MAXREQSIZE,
568                         .bc_rep_max_size        = MDS_MAXREPSIZE,
569                         .bc_req_portal          = MDS_MDS_PORTAL,
570                         .bc_rep_portal          = MDC_REPLY_PORTAL,
571                 },
572                 /*
573                  * We'd like to have a mechanism to set this on a per-device
574                  * basis, but alas...
575                  */
576                 .psc_thr                = {
577                         .tc_thr_name            = LUSTRE_MDT_NAME "_out",
578                         .tc_thr_factor          = MDS_THR_FACTOR,
579                         .tc_nthrs_init          = MDS_NTHRS_INIT,
580                         .tc_nthrs_base          = MDS_NTHRS_BASE,
581                         .tc_nthrs_max           = MDS_NTHRS_MAX,
582                         .tc_nthrs_user          = mds_num_threads,
583                         .tc_cpu_affinity        = 1,
584                         .tc_ctx_tags            = LCT_MD_THREAD,
585                 },
586                 .psc_cpt                = {
587                         .cc_pattern             = mds_num_cpts,
588                 },
589                 .psc_ops                = {
590                         .so_req_handler         = mdt_out_handle,
591                         .so_req_printer         = target_print_req,
592                         .so_hpreq_handler       = NULL,
593                 },
594         };
595         m->mds_out_service = ptlrpc_register_service(&conf, procfs_entry);
596         if (IS_ERR(m->mds_out_service)) {
597                 rc = PTR_ERR(m->mds_out_service);
598                 CERROR("failed to start out service: %d\n", rc);
599                 m->mds_out_service = NULL;
600                 GOTO(err_mds_svc, rc);
601         }
602
603         /*
604          * sequence controller service configuration
605          */
606         memset(&conf, 0, sizeof(conf));
607         conf = (typeof(conf)) {
608                 .psc_name               = LUSTRE_MDT_NAME "_seqs",
609                 .psc_watchdog_factor    = MDT_SERVICE_WATCHDOG_FACTOR,
610                 .psc_buf                = {
611                         .bc_nbufs               = MDS_NBUFS,
612                         .bc_buf_size            = SEQ_BUFSIZE,
613                         .bc_req_max_size        = SEQ_MAXREQSIZE,
614                         .bc_rep_max_size        = SEQ_MAXREPSIZE,
615                         .bc_req_portal          = SEQ_CONTROLLER_PORTAL,
616                         .bc_rep_portal          = MDC_REPLY_PORTAL,
617                 },
618                 .psc_thr                = {
619                         .tc_thr_name            = LUSTRE_MDT_NAME "_seqs",
620                         .tc_nthrs_init          = MDS_OTHR_NTHRS_INIT,
621                         .tc_nthrs_max           = MDS_OTHR_NTHRS_MAX,
622                         .tc_ctx_tags            = LCT_MD_THREAD,
623                 },
624                 .psc_ops                = {
625                         .so_req_handler         = mds_mdsc_handle,
626                         .so_req_printer         = target_print_req,
627                         .so_hpreq_handler       = NULL,
628                 },
629         };
630         m->mds_mdsc_service = ptlrpc_register_service(&conf, procfs_entry);
631         if (IS_ERR(m->mds_mdsc_service)) {
632                 rc = PTR_ERR(m->mds_mdsc_service);
633                 CERROR("failed to start seq controller service: %d\n", rc);
634                 m->mds_mdsc_service = NULL;
635
636                 GOTO(err_mds_svc, rc);
637         }
638
639         /*
640          * metadata sequence server service configuration
641          */
642         memset(&conf, 0, sizeof(conf));
643         conf = (typeof(conf)) {
644                 .psc_name               = LUSTRE_MDT_NAME "_seqm",
645                 .psc_watchdog_factor    = MDT_SERVICE_WATCHDOG_FACTOR,
646                 .psc_buf                = {
647                         .bc_nbufs               = MDS_NBUFS,
648                         .bc_buf_size            = SEQ_BUFSIZE,
649                         .bc_req_max_size        = SEQ_MAXREQSIZE,
650                         .bc_rep_max_size        = SEQ_MAXREPSIZE,
651                         .bc_req_portal          = SEQ_METADATA_PORTAL,
652                         .bc_rep_portal          = MDC_REPLY_PORTAL,
653                 },
654                 .psc_thr                = {
655                         .tc_thr_name            = LUSTRE_MDT_NAME "_seqm",
656                         .tc_nthrs_init          = MDS_OTHR_NTHRS_INIT,
657                         .tc_nthrs_max           = MDS_OTHR_NTHRS_MAX,
658                         .tc_ctx_tags            = LCT_MD_THREAD | LCT_DT_THREAD
659                 },
660                 .psc_ops                = {
661                         .so_req_handler         = mds_mdss_handle,
662                         .so_req_printer         = target_print_req,
663                         .so_hpreq_handler       = NULL,
664                 },
665         };
666         m->mds_mdss_service = ptlrpc_register_service(&conf, procfs_entry);
667         if (IS_ERR(m->mds_mdss_service)) {
668                 rc = PTR_ERR(m->mds_mdss_service);
669                 CERROR("failed to start metadata seq server service: %d\n", rc);
670                 m->mds_mdss_service = NULL;
671
672                 GOTO(err_mds_svc, rc);
673         }
674
675         /* FLD service start */
676         memset(&conf, 0, sizeof(conf));
677         conf = (typeof(conf)) {
678                 .psc_name            = LUSTRE_MDT_NAME "_fld",
679                 .psc_watchdog_factor = MDT_SERVICE_WATCHDOG_FACTOR,
680                 .psc_buf                = {
681                         .bc_nbufs               = MDS_NBUFS,
682                         .bc_buf_size            = FLD_BUFSIZE,
683                         .bc_req_max_size        = FLD_MAXREQSIZE,
684                         .bc_rep_max_size        = FLD_MAXREPSIZE,
685                         .bc_req_portal          = FLD_REQUEST_PORTAL,
686                         .bc_rep_portal          = MDC_REPLY_PORTAL,
687                 },
688                 .psc_thr                = {
689                         .tc_thr_name            = LUSTRE_MDT_NAME "_fld",
690                         .tc_nthrs_init          = MDS_OTHR_NTHRS_INIT,
691                         .tc_nthrs_max           = MDS_OTHR_NTHRS_MAX,
692                         .tc_ctx_tags            = LCT_DT_THREAD | LCT_MD_THREAD
693                 },
694                 .psc_ops                = {
695                         .so_req_handler         = mds_fld_handle,
696                         .so_req_printer         = target_print_req,
697                         .so_hpreq_handler       = NULL,
698                 },
699         };
700         m->mds_fld_service = ptlrpc_register_service(&conf, procfs_entry);
701         if (IS_ERR(m->mds_fld_service)) {
702                 rc = PTR_ERR(m->mds_fld_service);
703                 CERROR("failed to start fld service: %d\n", rc);
704                 m->mds_fld_service = NULL;
705
706                 GOTO(err_mds_svc, rc);
707         }
708
709         EXIT;
710 err_mds_svc:
711         if (rc)
712                 mds_stop_ptlrpc_service(m);
713
714         return rc;
715 }
716
717 static inline struct mds_device *mds_dev(struct lu_device *d)
718 {
719         return container_of0(d, struct mds_device, mds_md_dev.md_lu_dev);
720 }
721
722 static struct lu_device *mds_device_fini(const struct lu_env *env,
723                                          struct lu_device *d)
724 {
725         struct mds_device *m = mds_dev(d);
726         struct obd_device *obd = d->ld_obd;
727         ENTRY;
728
729         mds_stop_ptlrpc_service(m);
730         lprocfs_obd_cleanup(obd);
731         RETURN(NULL);
732 }
733
734 static struct lu_device *mds_device_free(const struct lu_env *env,
735                                          struct lu_device *d)
736 {
737         struct mds_device *m = mds_dev(d);
738         ENTRY;
739
740         md_device_fini(&m->mds_md_dev);
741         OBD_FREE_PTR(m);
742         RETURN(NULL);
743 }
744
745 static struct lu_device *mds_device_alloc(const struct lu_env *env,
746                                           struct lu_device_type *t,
747                                           struct lustre_cfg *cfg)
748 {
749         struct mds_device        *m;
750         struct obd_device        *obd;
751         struct lu_device          *l;
752         int rc;
753
754         OBD_ALLOC_PTR(m);
755         if (m == NULL)
756                 return ERR_PTR(-ENOMEM);
757
758         md_device_init(&m->mds_md_dev, t);
759         l = &m->mds_md_dev.md_lu_dev;
760
761         obd = class_name2obd(lustre_cfg_string(cfg, 0));
762         LASSERT(obd != NULL);
763
764         l->ld_obd = obd;
765         /* set this lu_device to obd, because error handling need it */
766         obd->obd_lu_dev = l;
767
768         rc = lprocfs_obd_setup(obd, lprocfs_mds_obd_vars);
769         if (rc != 0) {
770                 mds_device_free(env, l);
771                 l = ERR_PTR(rc);
772                 return l;
773         }
774
775         rc = mds_start_ptlrpc_service(m);
776
777         if (rc != 0) {
778                 mds_device_free(env, l);
779                 l = ERR_PTR(rc);
780                 return l;
781         }
782
783         return l;
784 }
785
786 /* type constructor/destructor: mdt_type_init, mdt_type_fini */
787 LU_TYPE_INIT_FINI(mds, &mdt_thread_key);
788
789 static struct lu_device_type_operations mds_device_type_ops = {
790         .ldto_init = mds_type_init,
791         .ldto_fini = mds_type_fini,
792
793         .ldto_start = mds_type_start,
794         .ldto_stop  = mds_type_stop,
795
796         .ldto_device_alloc = mds_device_alloc,
797         .ldto_device_free  = mds_device_free,
798         .ldto_device_fini  = mds_device_fini
799 };
800
801 static struct lu_device_type mds_device_type = {
802         .ldt_tags     = LU_DEVICE_MD,
803         .ldt_name     = LUSTRE_MDS_NAME,
804         .ldt_ops      = &mds_device_type_ops,
805         .ldt_ctx_tags = LCT_MD_THREAD
806 };
807
808 static struct obd_ops mds_obd_device_ops = {
809         .o_owner           = THIS_MODULE,
810 };
811
812 int mds_mod_init(void)
813 {
814         int rc;
815
816         if (mdt_num_threads != 0 && mds_num_threads == 0) {
817                 LCONSOLE_INFO("mdt_num_threads module parameter is deprecated, "
818                               "use mds_num_threads instead or unset both for "
819                               "dynamic thread startup\n");
820                 mds_num_threads = mdt_num_threads;
821         }
822
823         rc = class_register_type(&mds_obd_device_ops, NULL,
824                                  lprocfs_mds_module_vars, LUSTRE_MDS_NAME,
825                                  &mds_device_type);
826         return rc;
827 }
828
829 void mds_mod_exit(void)
830 {
831         class_unregister_type(LUSTRE_MDS_NAME);
832 }