Whamcloud - gitweb
LU-1330 obdclass: add obd_target.h
[fs/lustre-release.git] / lustre / mdt / mdt_mds.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2013, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  *
31  * lustre/mdt/mdt_mds.c
32  *
33  * Lustre Metadata Service Layer
34  *
35  * Author: Di Wang <di.wang@whamcloud.com>
36  **/
37
38 #define DEBUG_SUBSYSTEM S_MDS
39
40 #include <linux/module.h>
41
42 #include <obd_support.h>
43 /* struct ptlrpc_request */
44 #include <lustre_net.h>
45 /* struct obd_export */
46 #include <lustre_export.h>
47 /* struct obd_device */
48 #include <obd.h>
49 /* lu2dt_dev() */
50 #include <dt_object.h>
51 #include <lustre_mds.h>
52 #include <lustre_mdt.h>
53 #include "mdt_internal.h"
54 #include <lustre_quota.h>
55 #include <lustre_acl.h>
56 #include <lustre_param.h>
57
58 struct mds_device {
59         /* super-class */
60         struct md_device           mds_md_dev;
61         struct ptlrpc_service     *mds_regular_service;
62         struct ptlrpc_service     *mds_readpage_service;
63         struct ptlrpc_service     *mds_out_service;
64         struct ptlrpc_service     *mds_setattr_service;
65         struct ptlrpc_service     *mds_mdsc_service;
66         struct ptlrpc_service     *mds_mdss_service;
67         struct ptlrpc_service     *mds_fld_service;
68 };
69
70 /*
71  *  * Initialized in mdt_mod_init().
72  *   */
73 static unsigned long mdt_num_threads;
74 CFS_MODULE_PARM(mdt_num_threads, "ul", ulong, 0444,
75                 "number of MDS service threads to start "
76                 "(deprecated in favor of mds_num_threads)");
77
78 static unsigned long mds_num_threads;
79 CFS_MODULE_PARM(mds_num_threads, "ul", ulong, 0444,
80                 "number of MDS service threads to start");
81
82 static char *mds_num_cpts;
83 CFS_MODULE_PARM(mds_num_cpts, "c", charp, 0444,
84                 "CPU partitions MDS threads should run on");
85
86 static unsigned long mds_rdpg_num_threads;
87 CFS_MODULE_PARM(mds_rdpg_num_threads, "ul", ulong, 0444,
88                 "number of MDS readpage service threads to start");
89
90 static char *mds_rdpg_num_cpts;
91 CFS_MODULE_PARM(mds_rdpg_num_cpts, "c", charp, 0444,
92                 "CPU partitions MDS readpage threads should run on");
93
94 /* NB: these two should be removed along with setattr service in the future */
95 static unsigned long mds_attr_num_threads;
96 CFS_MODULE_PARM(mds_attr_num_threads, "ul", ulong, 0444,
97                 "number of MDS setattr service threads to start");
98
99 static char *mds_attr_num_cpts;
100 CFS_MODULE_PARM(mds_attr_num_cpts, "c", charp, 0444,
101                 "CPU partitions MDS setattr threads should run on");
102
103 #define DEFINE_RPC_HANDLER(base, flags, opc, fn, fmt)                   \
104 [opc - base] = {                                                        \
105         .mh_name        = #opc,                                         \
106         .mh_fail_id     = OBD_FAIL_ ## opc ## _NET,                     \
107         .mh_opc         = opc,                                          \
108         .mh_flags       = flags,                                        \
109         .mh_act         = fn,                                           \
110         .mh_fmt         = fmt                                           \
111 }
112
113 /* Request with a format known in advance */
114 #define DEF_MDT_HDL(flags, name, fn)                                    \
115         DEFINE_RPC_HANDLER(MDS_GETATTR, flags, name, fn, &RQF_ ## name)
116
117 /* Request with a format we do not yet know */
118 #define DEF_MDT_HDL_VAR(flags, name, fn)                                \
119         DEFINE_RPC_HANDLER(MDS_GETATTR, flags, name, fn, NULL)
120
121 /* Map one non-standard request format handler.  This should probably get
122  * a common OBD_SET_INFO RPC opcode instead of this mismatch. */
123 #define RQF_MDS_SET_INFO RQF_OBD_SET_INFO
124
125 static struct mdt_handler mdt_mds_ops[] = {
126 DEF_MDT_HDL(0,                          MDS_CONNECT,      mdt_connect),
127 DEF_MDT_HDL(0,                          MDS_DISCONNECT,   mdt_disconnect),
128 DEF_MDT_HDL(0,                          MDS_SET_INFO,     mdt_set_info),
129 DEF_MDT_HDL(0,                          MDS_GET_INFO,     mdt_get_info),
130 DEF_MDT_HDL(0           | HABEO_REFERO, MDS_GETSTATUS,    mdt_getstatus),
131 DEF_MDT_HDL(HABEO_CORPUS,               MDS_GETATTR,      mdt_getattr),
132 DEF_MDT_HDL(HABEO_CORPUS| HABEO_REFERO, MDS_GETATTR_NAME, mdt_getattr_name),
133 DEF_MDT_HDL(HABEO_CORPUS,               MDS_GETXATTR,     mdt_getxattr),
134 DEF_MDT_HDL(0           | HABEO_REFERO, MDS_STATFS,       mdt_statfs),
135 DEF_MDT_HDL(0           | MUTABOR,      MDS_REINT,        mdt_reint),
136 DEF_MDT_HDL(HABEO_CORPUS,               MDS_CLOSE,        mdt_close),
137 DEF_MDT_HDL(HABEO_CORPUS,               MDS_DONE_WRITING, mdt_done_writing),
138 DEF_MDT_HDL(0           | HABEO_REFERO, MDS_PIN,          mdt_pin),
139 DEF_MDT_HDL_VAR(0,                      MDS_SYNC,         mdt_sync),
140 DEF_MDT_HDL(HABEO_CORPUS| HABEO_REFERO, MDS_IS_SUBDIR,    mdt_is_subdir),
141 DEF_MDT_HDL(0,                          MDS_QUOTACHECK,   mdt_quotacheck),
142 DEF_MDT_HDL(0,                          MDS_QUOTACTL,     mdt_quotactl),
143 DEF_MDT_HDL(HABEO_CORPUS| HABEO_REFERO, MDS_HSM_PROGRESS, mdt_hsm_progress),
144 DEF_MDT_HDL(HABEO_CORPUS| HABEO_REFERO, MDS_HSM_CT_REGISTER,
145                                                 mdt_hsm_ct_register),
146 DEF_MDT_HDL(HABEO_CORPUS| HABEO_REFERO, MDS_HSM_CT_UNREGISTER,
147                                                 mdt_hsm_ct_unregister),
148 DEF_MDT_HDL(HABEO_CORPUS| HABEO_REFERO, MDS_HSM_STATE_GET,
149                                                 mdt_hsm_state_get),
150 DEF_MDT_HDL(HABEO_CORPUS| HABEO_REFERO, MDS_HSM_STATE_SET,
151                                                 mdt_hsm_state_set),
152 DEF_MDT_HDL(HABEO_CORPUS| HABEO_REFERO, MDS_HSM_ACTION, mdt_hsm_action),
153 DEF_MDT_HDL(HABEO_CORPUS| HABEO_REFERO, MDS_HSM_REQUEST, mdt_hsm_request),
154 DEF_MDT_HDL(HABEO_CORPUS|HABEO_REFERO,  MDS_SWAP_LAYOUTS, mdt_swap_layouts)
155 };
156
157 #define DEF_OBD_HDL(flags, name, fn)                                    \
158         DEFINE_RPC_HANDLER(OBD_PING, flags, name, fn, NULL)
159
160 static struct mdt_handler mdt_obd_ops[] = {
161 DEF_OBD_HDL(0,                          OBD_PING,         mdt_obd_ping),
162 DEF_OBD_HDL(0,                          OBD_LOG_CANCEL,   mdt_obd_log_cancel),
163 DEF_OBD_HDL(0,                          OBD_QC_CALLBACK,  mdt_obd_qc_callback),
164 DEF_OBD_HDL(0,                          OBD_IDX_READ,     mdt_obd_idx_read)
165 };
166
167 #define DEF_DLM_HDL_VAR(flags, name, fn)                                \
168         DEFINE_RPC_HANDLER(LDLM_ENQUEUE, flags, name, fn, NULL)
169 #define DEF_DLM_HDL(flags, name, fn)                                    \
170         DEFINE_RPC_HANDLER(LDLM_ENQUEUE, flags, name, fn, &RQF_ ## name)
171
172 static struct mdt_handler mdt_dlm_ops[] = {
173 DEF_DLM_HDL    (HABEO_CLAVIS,           LDLM_ENQUEUE,     mdt_enqueue),
174 DEF_DLM_HDL_VAR(HABEO_CLAVIS,           LDLM_CONVERT,     mdt_convert),
175 DEF_DLM_HDL_VAR(0,                      LDLM_BL_CALLBACK, mdt_bl_callback),
176 DEF_DLM_HDL_VAR(0,                      LDLM_CP_CALLBACK, mdt_cp_callback)
177 };
178
179 #define DEF_LLOG_HDL(flags, name, fn)                                   \
180         DEFINE_RPC_HANDLER(LLOG_ORIGIN_HANDLE_CREATE, flags, name, fn, NULL)
181
182 static struct mdt_handler mdt_llog_ops[] = {
183 DEF_LLOG_HDL(0,         LLOG_ORIGIN_HANDLE_CREATE,        mdt_llog_create),
184 DEF_LLOG_HDL(0,         LLOG_ORIGIN_HANDLE_NEXT_BLOCK,    mdt_llog_next_block),
185 DEF_LLOG_HDL(0,         LLOG_ORIGIN_HANDLE_READ_HEADER,   mdt_llog_read_header),
186 DEF_LLOG_HDL(0,         LLOG_ORIGIN_HANDLE_WRITE_REC,     NULL),
187 DEF_LLOG_HDL(0,         LLOG_ORIGIN_HANDLE_CLOSE,         NULL),
188 DEF_LLOG_HDL(0,         LLOG_ORIGIN_CONNECT,              NULL),
189 DEF_LLOG_HDL(0,         LLOG_CATINFO,                     NULL),
190 DEF_LLOG_HDL(0,         LLOG_ORIGIN_HANDLE_PREV_BLOCK,    mdt_llog_prev_block),
191 DEF_LLOG_HDL(0,         LLOG_ORIGIN_HANDLE_DESTROY,       mdt_llog_destroy),
192 };
193
194 #define DEF_SEC_HDL(flags, name, fn)                                    \
195         DEFINE_RPC_HANDLER(SEC_CTX_INIT, flags, name, fn, NULL)
196
197 static struct mdt_handler mdt_sec_ctx_ops[] = {
198 DEF_SEC_HDL(0,                          SEC_CTX_INIT,     mdt_sec_ctx_handle),
199 DEF_SEC_HDL(0,                          SEC_CTX_INIT_CONT,mdt_sec_ctx_handle),
200 DEF_SEC_HDL(0,                          SEC_CTX_FINI,     mdt_sec_ctx_handle)
201 };
202
203 #define DEF_QUOTA_HDL(flags, name, fn)                          \
204         DEFINE_RPC_HANDLER(QUOTA_DQACQ, flags, name, fn, &RQF_ ## name)
205
206 static struct mdt_handler mdt_quota_ops[] = {
207 DEF_QUOTA_HDL(HABEO_REFERO,             QUOTA_DQACQ,      mdt_quota_dqacq),
208 };
209
210 struct mdt_opc_slice mdt_regular_handlers[] = {
211         {
212                 .mos_opc_start  = MDS_GETATTR,
213                 .mos_opc_end    = MDS_LAST_OPC,
214                 .mos_hs         = mdt_mds_ops
215         },
216         {
217                 .mos_opc_start  = OBD_PING,
218                 .mos_opc_end    = OBD_LAST_OPC,
219                 .mos_hs         = mdt_obd_ops
220         },
221         {
222                 .mos_opc_start  = LDLM_ENQUEUE,
223                 .mos_opc_end    = LDLM_LAST_OPC,
224                 .mos_hs         = mdt_dlm_ops
225         },
226         {
227                 .mos_opc_start  = LLOG_ORIGIN_HANDLE_CREATE,
228                 .mos_opc_end    = LLOG_LAST_OPC,
229                 .mos_hs         = mdt_llog_ops
230         },
231         {
232                 .mos_opc_start  = SEC_CTX_INIT,
233                 .mos_opc_end    = SEC_LAST_OPC,
234                 .mos_hs         = mdt_sec_ctx_ops
235         },
236         {
237                 .mos_opc_start  = QUOTA_DQACQ,
238                 .mos_opc_end    = QUOTA_LAST_OPC,
239                 .mos_hs         = mdt_quota_ops
240         },
241         {
242                 .mos_hs         = NULL
243         }
244 };
245
246 /* Readpage/readdir handlers */
247 static struct mdt_handler mdt_readpage_ops[] = {
248 DEF_MDT_HDL(0,                  MDS_CONNECT,  mdt_connect),
249 DEF_MDT_HDL(HABEO_CORPUS | HABEO_REFERO, MDS_READPAGE, mdt_readpage),
250 /* XXX: this is ugly and should be fixed one day, see mdc_close() for
251  * detailed comments. --umka */
252 DEF_MDT_HDL(HABEO_CORPUS,               MDS_CLOSE,        mdt_close),
253 DEF_MDT_HDL(HABEO_CORPUS,               MDS_DONE_WRITING, mdt_done_writing),
254 };
255
256 static struct mdt_opc_slice mdt_readpage_handlers[] = {
257         {
258                 .mos_opc_start = MDS_GETATTR,
259                 .mos_opc_end   = MDS_LAST_OPC,
260                 .mos_hs = mdt_readpage_ops
261         },
262         {
263                 .mos_opc_start = OBD_FIRST_OPC,
264                 .mos_opc_end   = OBD_LAST_OPC,
265                 .mos_hs = mdt_obd_ops
266         },
267         {
268                 .mos_hs = NULL
269         }
270 };
271
272 /* Sequence service handlers */
273 #define DEF_SEQ_HDL(flags, name, fn)                                    \
274         DEFINE_RPC_HANDLER(SEQ_QUERY, flags, name, fn, &RQF_ ## name)
275
276 static struct mdt_handler mdt_seq_ops[] = {
277 DEF_SEQ_HDL(0,                          SEQ_QUERY,        (void *)seq_query),
278 };
279
280 struct mdt_opc_slice mdt_seq_handlers[] = {
281         {
282                 .mos_opc_start = SEQ_QUERY,
283                 .mos_opc_end   = SEQ_LAST_OPC,
284                 .mos_hs = mdt_seq_ops
285         },
286         {
287                 .mos_hs = NULL
288         }
289 };
290
291 /* FID Location Database handlers */
292 #define DEF_FLD_HDL(flags, name, fn)                                    \
293         DEFINE_RPC_HANDLER(FLD_QUERY, flags, name, fn, &RQF_ ## name)
294
295 static struct mdt_handler mdt_fld_ops[] = {
296 DEF_FLD_HDL(0,                          FLD_QUERY,        (void *)fld_query),
297 };
298
299 struct mdt_opc_slice mdt_fld_handlers[] = {
300         {
301                 .mos_opc_start = FLD_QUERY,
302                 .mos_opc_end   = FLD_LAST_OPC,
303                 .mos_hs = mdt_fld_ops
304         },
305         {
306                 .mos_hs = NULL
307         }
308 };
309
310 /* Request with a format known in advance */
311 #define DEF_UPDATE_HDL(flags, name, fn)                                 \
312         DEFINE_RPC_HANDLER(UPDATE_OBJ, flags, name, fn, &RQF_ ## name)
313
314 #define target_handler mdt_handler
315 static struct target_handler out_ops[] = {
316         DEF_UPDATE_HDL(MUTABOR,         UPDATE_OBJ,     out_handle),
317 };
318
319 static struct mdt_opc_slice update_handlers[] = {
320         {
321                 .mos_opc_start = MDS_GETATTR,
322                 .mos_opc_end   = MDS_LAST_OPC,
323                 .mos_hs        = mdt_mds_ops
324         },
325         {
326                 .mos_opc_start = OBD_PING,
327                 .mos_opc_end   = OBD_LAST_OPC,
328                 .mos_hs        = mdt_obd_ops
329         },
330         {
331                 .mos_opc_start = LDLM_ENQUEUE,
332                 .mos_opc_end   = LDLM_LAST_OPC,
333                 .mos_hs        = mdt_dlm_ops
334         },
335         {
336                 .mos_opc_start = SEC_CTX_INIT,
337                 .mos_opc_end   = SEC_LAST_OPC,
338                 .mos_hs        = mdt_sec_ctx_ops
339         },
340         {
341                 .mos_opc_start = UPDATE_OBJ,
342                 .mos_opc_end   = UPDATE_LAST_OPC,
343                 .mos_hs        = out_ops
344         },
345         {
346                 .mos_hs        = NULL
347         }
348 };
349
350 static int mds_regular_handle(struct ptlrpc_request *req)
351 {
352         return mdt_handle_common(req, mdt_regular_handlers);
353 }
354
355 static int mds_readpage_handle(struct ptlrpc_request *req)
356 {
357         return mdt_handle_common(req, mdt_readpage_handlers);
358 }
359
360 static int mds_mdsc_handle(struct ptlrpc_request *req)
361 {
362         return mdt_handle_common(req, mdt_seq_handlers);
363 }
364
365 static int mdt_out_handle(struct ptlrpc_request *req)
366 {
367         return mdt_handle_common(req, update_handlers);
368 }
369
370 static int mds_mdss_handle(struct ptlrpc_request *req)
371 {
372         return mdt_handle_common(req, mdt_seq_handlers);
373 }
374
375 static int mds_fld_handle(struct ptlrpc_request *req)
376 {
377         return mdt_handle_common(req, mdt_fld_handlers);
378 }
379
380 /* device init/fini methods */
381 static void mds_stop_ptlrpc_service(struct mds_device *m)
382 {
383         ENTRY;
384         if (m->mds_regular_service != NULL) {
385                 ptlrpc_unregister_service(m->mds_regular_service);
386                 m->mds_regular_service = NULL;
387         }
388         if (m->mds_readpage_service != NULL) {
389                 ptlrpc_unregister_service(m->mds_readpage_service);
390                 m->mds_readpage_service = NULL;
391         }
392         if (m->mds_out_service != NULL) {
393                 ptlrpc_unregister_service(m->mds_out_service);
394                 m->mds_out_service = NULL;
395         }
396         if (m->mds_setattr_service != NULL) {
397                 ptlrpc_unregister_service(m->mds_setattr_service);
398                 m->mds_setattr_service = NULL;
399         }
400         if (m->mds_mdsc_service != NULL) {
401                 ptlrpc_unregister_service(m->mds_mdsc_service);
402                 m->mds_mdsc_service = NULL;
403         }
404         if (m->mds_mdss_service != NULL) {
405                 ptlrpc_unregister_service(m->mds_mdss_service);
406                 m->mds_mdss_service = NULL;
407         }
408         if (m->mds_fld_service != NULL) {
409                 ptlrpc_unregister_service(m->mds_fld_service);
410                 m->mds_fld_service = NULL;
411         }
412         EXIT;
413 }
414
415 static int mds_start_ptlrpc_service(struct mds_device *m)
416 {
417         static struct ptlrpc_service_conf conf;
418         struct obd_device *obd = m->mds_md_dev.md_lu_dev.ld_obd;
419         cfs_proc_dir_entry_t *procfs_entry;
420         int rc = 0;
421         ENTRY;
422
423         procfs_entry = obd->obd_proc_entry;
424         LASSERT(procfs_entry != NULL);
425
426         conf = (typeof(conf)) {
427                 .psc_name               = LUSTRE_MDT_NAME,
428                 .psc_watchdog_factor    = MDT_SERVICE_WATCHDOG_FACTOR,
429                 .psc_buf                = {
430                         .bc_nbufs               = MDS_NBUFS,
431                         .bc_buf_size            = MDS_REG_BUFSIZE,
432                         .bc_req_max_size        = MDS_REG_MAXREQSIZE,
433                         .bc_rep_max_size        = MDS_REG_MAXREPSIZE,
434                         .bc_req_portal          = MDS_REQUEST_PORTAL,
435                         .bc_rep_portal          = MDC_REPLY_PORTAL,
436                 },
437                 /*
438                  * We'd like to have a mechanism to set this on a per-device
439                  * basis, but alas...
440                  */
441                 .psc_thr                = {
442                         .tc_thr_name            = LUSTRE_MDT_NAME,
443                         .tc_thr_factor          = MDS_THR_FACTOR,
444                         .tc_nthrs_init          = MDS_NTHRS_INIT,
445                         .tc_nthrs_base          = MDS_NTHRS_BASE,
446                         .tc_nthrs_max           = MDS_NTHRS_MAX,
447                         .tc_nthrs_user          = mds_num_threads,
448                         .tc_cpu_affinity        = 1,
449                         .tc_ctx_tags            = LCT_MD_THREAD,
450                 },
451                 .psc_cpt                = {
452                         .cc_pattern             = mds_num_cpts,
453                 },
454                 .psc_ops                = {
455                         .so_req_handler         = mds_regular_handle,
456                         .so_req_printer         = target_print_req,
457                         .so_hpreq_handler       = ptlrpc_hpreq_handler,
458                 },
459         };
460         m->mds_regular_service = ptlrpc_register_service(&conf, procfs_entry);
461         if (IS_ERR(m->mds_regular_service)) {
462                 rc = PTR_ERR(m->mds_regular_service);
463                 CERROR("failed to start regular mdt service: %d\n", rc);
464                 m->mds_regular_service = NULL;
465
466                 RETURN(rc);
467         }
468
469         /*
470          * readpage service configuration. Parameters have to be adjusted,
471          * ideally.
472          */
473         memset(&conf, 0, sizeof(conf));
474         conf = (typeof(conf)) {
475                 .psc_name               = LUSTRE_MDT_NAME "_readpage",
476                 .psc_watchdog_factor    = MDT_SERVICE_WATCHDOG_FACTOR,
477                 .psc_buf                = {
478                         .bc_nbufs               = MDS_NBUFS,
479                         .bc_buf_size            = MDS_BUFSIZE,
480                         .bc_req_max_size        = MDS_MAXREQSIZE,
481                         .bc_rep_max_size        = MDS_MAXREPSIZE,
482                         .bc_req_portal          = MDS_READPAGE_PORTAL,
483                         .bc_rep_portal          = MDC_REPLY_PORTAL,
484                 },
485                 .psc_thr                = {
486                         .tc_thr_name            = LUSTRE_MDT_NAME "_rdpg",
487                         .tc_thr_factor          = MDS_RDPG_THR_FACTOR,
488                         .tc_nthrs_init          = MDS_RDPG_NTHRS_INIT,
489                         .tc_nthrs_base          = MDS_RDPG_NTHRS_BASE,
490                         .tc_nthrs_max           = MDS_RDPG_NTHRS_MAX,
491                         .tc_nthrs_user          = mds_rdpg_num_threads,
492                         .tc_cpu_affinity        = 1,
493                         .tc_ctx_tags            = LCT_MD_THREAD,
494                 },
495                 .psc_cpt                = {
496                         .cc_pattern             = mds_rdpg_num_cpts,
497                 },
498                 .psc_ops                = {
499                         .so_req_handler         = mds_readpage_handle,
500                         .so_req_printer         = target_print_req,
501                 },
502         };
503         m->mds_readpage_service = ptlrpc_register_service(&conf, procfs_entry);
504         if (IS_ERR(m->mds_readpage_service)) {
505                 rc = PTR_ERR(m->mds_readpage_service);
506                 CERROR("failed to start readpage service: %d\n", rc);
507                 m->mds_readpage_service = NULL;
508
509                 GOTO(err_mds_svc, rc);
510         }
511
512         /*
513          * setattr service configuration.
514          *
515          * XXX To keep the compatibility with old client(< 2.2), we need to
516          * preserve this portal for a certain time, it should be removed
517          * eventually. LU-617.
518          */
519         memset(&conf, 0, sizeof(conf));
520         conf = (typeof(conf)) {
521                 .psc_name               = LUSTRE_MDT_NAME "_setattr",
522                 .psc_watchdog_factor    = MDT_SERVICE_WATCHDOG_FACTOR,
523                 .psc_buf                = {
524                         .bc_nbufs               = MDS_NBUFS,
525                         .bc_buf_size            = MDS_BUFSIZE,
526                         .bc_req_max_size        = MDS_MAXREQSIZE,
527                         .bc_rep_max_size        = MDS_LOV_MAXREPSIZE,
528                         .bc_req_portal          = MDS_SETATTR_PORTAL,
529                         .bc_rep_portal          = MDC_REPLY_PORTAL,
530                 },
531                 .psc_thr                = {
532                         .tc_thr_name            = LUSTRE_MDT_NAME "_attr",
533                         .tc_thr_factor          = MDS_SETA_THR_FACTOR,
534                         .tc_nthrs_init          = MDS_SETA_NTHRS_INIT,
535                         .tc_nthrs_base          = MDS_SETA_NTHRS_BASE,
536                         .tc_nthrs_max           = MDS_SETA_NTHRS_MAX,
537                         .tc_nthrs_user          = mds_attr_num_threads,
538                         .tc_cpu_affinity        = 1,
539                         .tc_ctx_tags            = LCT_MD_THREAD,
540                 },
541                 .psc_cpt                = {
542                         .cc_pattern             = mds_attr_num_cpts,
543                 },
544                 .psc_ops                = {
545                         .so_req_handler         = mds_regular_handle,
546                         .so_req_printer         = target_print_req,
547                         .so_hpreq_handler       = NULL,
548                 },
549         };
550         m->mds_setattr_service = ptlrpc_register_service(&conf, procfs_entry);
551         if (IS_ERR(m->mds_setattr_service)) {
552                 rc = PTR_ERR(m->mds_setattr_service);
553                 CERROR("failed to start setattr service: %d\n", rc);
554                 m->mds_setattr_service = NULL;
555
556                 GOTO(err_mds_svc, rc);
557         }
558
559         /* Object update service */
560         conf = (typeof(conf)) {
561                 .psc_name               = LUSTRE_MDT_NAME "_out",
562                 .psc_watchdog_factor    = MDT_SERVICE_WATCHDOG_FACTOR,
563                 .psc_buf                = {
564                         .bc_nbufs               = MDS_NBUFS,
565                         .bc_buf_size            = MDS_OUT_BUFSIZE,
566                         .bc_req_max_size        = MDS_OUT_MAXREQSIZE,
567                         .bc_rep_max_size        = MDS_OUT_MAXREPSIZE,
568                         .bc_req_portal          = MDS_MDS_PORTAL,
569                         .bc_rep_portal          = MDC_REPLY_PORTAL,
570                 },
571                 /*
572                  * We'd like to have a mechanism to set this on a per-device
573                  * basis, but alas...
574                  */
575                 .psc_thr                = {
576                         .tc_thr_name            = LUSTRE_MDT_NAME "_out",
577                         .tc_thr_factor          = MDS_THR_FACTOR,
578                         .tc_nthrs_init          = MDS_NTHRS_INIT,
579                         .tc_nthrs_base          = MDS_NTHRS_BASE,
580                         .tc_nthrs_max           = MDS_NTHRS_MAX,
581                         .tc_nthrs_user          = mds_num_threads,
582                         .tc_cpu_affinity        = 1,
583                         .tc_ctx_tags            = LCT_MD_THREAD,
584                 },
585                 .psc_cpt                = {
586                         .cc_pattern             = mds_num_cpts,
587                 },
588                 .psc_ops                = {
589                         .so_req_handler         = mdt_out_handle,
590                         .so_req_printer         = target_print_req,
591                         .so_hpreq_handler       = NULL,
592                 },
593         };
594         m->mds_out_service = ptlrpc_register_service(&conf, procfs_entry);
595         if (IS_ERR(m->mds_out_service)) {
596                 rc = PTR_ERR(m->mds_out_service);
597                 CERROR("failed to start out service: %d\n", rc);
598                 m->mds_out_service = NULL;
599                 GOTO(err_mds_svc, rc);
600         }
601
602         /*
603          * sequence controller service configuration
604          */
605         memset(&conf, 0, sizeof(conf));
606         conf = (typeof(conf)) {
607                 .psc_name               = LUSTRE_MDT_NAME "_seqs",
608                 .psc_watchdog_factor    = MDT_SERVICE_WATCHDOG_FACTOR,
609                 .psc_buf                = {
610                         .bc_nbufs               = MDS_NBUFS,
611                         .bc_buf_size            = SEQ_BUFSIZE,
612                         .bc_req_max_size        = SEQ_MAXREQSIZE,
613                         .bc_rep_max_size        = SEQ_MAXREPSIZE,
614                         .bc_req_portal          = SEQ_CONTROLLER_PORTAL,
615                         .bc_rep_portal          = MDC_REPLY_PORTAL,
616                 },
617                 .psc_thr                = {
618                         .tc_thr_name            = LUSTRE_MDT_NAME "_seqs",
619                         .tc_nthrs_init          = MDS_OTHR_NTHRS_INIT,
620                         .tc_nthrs_max           = MDS_OTHR_NTHRS_MAX,
621                         .tc_ctx_tags            = LCT_MD_THREAD,
622                 },
623                 .psc_ops                = {
624                         .so_req_handler         = mds_mdsc_handle,
625                         .so_req_printer         = target_print_req,
626                         .so_hpreq_handler       = NULL,
627                 },
628         };
629         m->mds_mdsc_service = ptlrpc_register_service(&conf, procfs_entry);
630         if (IS_ERR(m->mds_mdsc_service)) {
631                 rc = PTR_ERR(m->mds_mdsc_service);
632                 CERROR("failed to start seq controller service: %d\n", rc);
633                 m->mds_mdsc_service = NULL;
634
635                 GOTO(err_mds_svc, rc);
636         }
637
638         /*
639          * metadata sequence server service configuration
640          */
641         memset(&conf, 0, sizeof(conf));
642         conf = (typeof(conf)) {
643                 .psc_name               = LUSTRE_MDT_NAME "_seqm",
644                 .psc_watchdog_factor    = MDT_SERVICE_WATCHDOG_FACTOR,
645                 .psc_buf                = {
646                         .bc_nbufs               = MDS_NBUFS,
647                         .bc_buf_size            = SEQ_BUFSIZE,
648                         .bc_req_max_size        = SEQ_MAXREQSIZE,
649                         .bc_rep_max_size        = SEQ_MAXREPSIZE,
650                         .bc_req_portal          = SEQ_METADATA_PORTAL,
651                         .bc_rep_portal          = MDC_REPLY_PORTAL,
652                 },
653                 .psc_thr                = {
654                         .tc_thr_name            = LUSTRE_MDT_NAME "_seqm",
655                         .tc_nthrs_init          = MDS_OTHR_NTHRS_INIT,
656                         .tc_nthrs_max           = MDS_OTHR_NTHRS_MAX,
657                         .tc_ctx_tags            = LCT_MD_THREAD | LCT_DT_THREAD
658                 },
659                 .psc_ops                = {
660                         .so_req_handler         = mds_mdss_handle,
661                         .so_req_printer         = target_print_req,
662                         .so_hpreq_handler       = NULL,
663                 },
664         };
665         m->mds_mdss_service = ptlrpc_register_service(&conf, procfs_entry);
666         if (IS_ERR(m->mds_mdss_service)) {
667                 rc = PTR_ERR(m->mds_mdss_service);
668                 CERROR("failed to start metadata seq server service: %d\n", rc);
669                 m->mds_mdss_service = NULL;
670
671                 GOTO(err_mds_svc, rc);
672         }
673
674         /* FLD service start */
675         memset(&conf, 0, sizeof(conf));
676         conf = (typeof(conf)) {
677                 .psc_name            = LUSTRE_MDT_NAME "_fld",
678                 .psc_watchdog_factor = MDT_SERVICE_WATCHDOG_FACTOR,
679                 .psc_buf                = {
680                         .bc_nbufs               = MDS_NBUFS,
681                         .bc_buf_size            = FLD_BUFSIZE,
682                         .bc_req_max_size        = FLD_MAXREQSIZE,
683                         .bc_rep_max_size        = FLD_MAXREPSIZE,
684                         .bc_req_portal          = FLD_REQUEST_PORTAL,
685                         .bc_rep_portal          = MDC_REPLY_PORTAL,
686                 },
687                 .psc_thr                = {
688                         .tc_thr_name            = LUSTRE_MDT_NAME "_fld",
689                         .tc_nthrs_init          = MDS_OTHR_NTHRS_INIT,
690                         .tc_nthrs_max           = MDS_OTHR_NTHRS_MAX,
691                         .tc_ctx_tags            = LCT_DT_THREAD | LCT_MD_THREAD
692                 },
693                 .psc_ops                = {
694                         .so_req_handler         = mds_fld_handle,
695                         .so_req_printer         = target_print_req,
696                         .so_hpreq_handler       = NULL,
697                 },
698         };
699         m->mds_fld_service = ptlrpc_register_service(&conf, procfs_entry);
700         if (IS_ERR(m->mds_fld_service)) {
701                 rc = PTR_ERR(m->mds_fld_service);
702                 CERROR("failed to start fld service: %d\n", rc);
703                 m->mds_fld_service = NULL;
704
705                 GOTO(err_mds_svc, rc);
706         }
707
708         EXIT;
709 err_mds_svc:
710         if (rc)
711                 mds_stop_ptlrpc_service(m);
712
713         return rc;
714 }
715
716 static inline struct mds_device *mds_dev(struct lu_device *d)
717 {
718         return container_of0(d, struct mds_device, mds_md_dev.md_lu_dev);
719 }
720
721 static struct lu_device *mds_device_fini(const struct lu_env *env,
722                                          struct lu_device *d)
723 {
724         struct mds_device *m = mds_dev(d);
725         struct obd_device *obd = d->ld_obd;
726         ENTRY;
727
728         mds_stop_ptlrpc_service(m);
729         lprocfs_obd_cleanup(obd);
730         RETURN(NULL);
731 }
732
733 static struct lu_device *mds_device_free(const struct lu_env *env,
734                                          struct lu_device *d)
735 {
736         struct mds_device *m = mds_dev(d);
737         ENTRY;
738
739         md_device_fini(&m->mds_md_dev);
740         OBD_FREE_PTR(m);
741         RETURN(NULL);
742 }
743
744 static struct lu_device *mds_device_alloc(const struct lu_env *env,
745                                           struct lu_device_type *t,
746                                           struct lustre_cfg *cfg)
747 {
748         struct mds_device        *m;
749         struct obd_device        *obd;
750         struct lu_device          *l;
751         int rc;
752
753         OBD_ALLOC_PTR(m);
754         if (m == NULL)
755                 return ERR_PTR(-ENOMEM);
756
757         md_device_init(&m->mds_md_dev, t);
758         l = &m->mds_md_dev.md_lu_dev;
759
760         obd = class_name2obd(lustre_cfg_string(cfg, 0));
761         LASSERT(obd != NULL);
762
763         l->ld_obd = obd;
764         /* set this lu_device to obd, because error handling need it */
765         obd->obd_lu_dev = l;
766
767         rc = lprocfs_obd_setup(obd, lprocfs_mds_obd_vars);
768         if (rc != 0) {
769                 mds_device_free(env, l);
770                 l = ERR_PTR(rc);
771                 return l;
772         }
773
774         rc = mds_start_ptlrpc_service(m);
775
776         if (rc != 0) {
777                 mds_device_free(env, l);
778                 l = ERR_PTR(rc);
779                 return l;
780         }
781
782         return l;
783 }
784
785 /* type constructor/destructor: mdt_type_init, mdt_type_fini */
786 LU_TYPE_INIT_FINI(mds, &mdt_thread_key);
787
788 static struct lu_device_type_operations mds_device_type_ops = {
789         .ldto_init = mds_type_init,
790         .ldto_fini = mds_type_fini,
791
792         .ldto_start = mds_type_start,
793         .ldto_stop  = mds_type_stop,
794
795         .ldto_device_alloc = mds_device_alloc,
796         .ldto_device_free  = mds_device_free,
797         .ldto_device_fini  = mds_device_fini
798 };
799
800 static struct lu_device_type mds_device_type = {
801         .ldt_tags     = LU_DEVICE_MD,
802         .ldt_name     = LUSTRE_MDS_NAME,
803         .ldt_ops      = &mds_device_type_ops,
804         .ldt_ctx_tags = LCT_MD_THREAD
805 };
806
807 static struct obd_ops mds_obd_device_ops = {
808         .o_owner           = THIS_MODULE,
809 };
810
811 int mds_mod_init(void)
812 {
813         int rc;
814
815         if (mdt_num_threads != 0 && mds_num_threads == 0) {
816                 LCONSOLE_INFO("mdt_num_threads module parameter is deprecated, "
817                               "use mds_num_threads instead or unset both for "
818                               "dynamic thread startup\n");
819                 mds_num_threads = mdt_num_threads;
820         }
821
822         rc = class_register_type(&mds_obd_device_ops, NULL,
823                                  lprocfs_mds_module_vars, LUSTRE_MDS_NAME,
824                                  &mds_device_type);
825         return rc;
826 }
827
828 void mds_mod_exit(void)
829 {
830         class_unregister_type(LUSTRE_MDS_NAME);
831 }