Whamcloud - gitweb
LU-3467 seq: unified SEQ handler
[fs/lustre-release.git] / lustre / mdt / mdt_mds.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2013, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  *
31  * lustre/mdt/mdt_mds.c
32  *
33  * Lustre Metadata Service Layer
34  *
35  * Author: Di Wang <di.wang@whamcloud.com>
36  **/
37
38 #define DEBUG_SUBSYSTEM S_MDS
39
40 #include <linux/module.h>
41
42 #include <obd_support.h>
43 /* struct ptlrpc_request */
44 #include <lustre_net.h>
45 /* struct obd_export */
46 #include <lustre_export.h>
47 /* struct obd_device */
48 #include <obd.h>
49 /* lu2dt_dev() */
50 #include <dt_object.h>
51 #include <lustre_mds.h>
52 #include "mdt_internal.h"
53 #include <lustre_quota.h>
54 #include <lustre_acl.h>
55 #include <lustre_param.h>
56
57 struct mds_device {
58         /* super-class */
59         struct md_device           mds_md_dev;
60         struct ptlrpc_service     *mds_regular_service;
61         struct ptlrpc_service     *mds_readpage_service;
62         struct ptlrpc_service     *mds_out_service;
63         struct ptlrpc_service     *mds_setattr_service;
64         struct ptlrpc_service     *mds_mdsc_service;
65         struct ptlrpc_service     *mds_mdss_service;
66         struct ptlrpc_service     *mds_fld_service;
67 };
68
69 /*
70  *  * Initialized in mdt_mod_init().
71  *   */
72 static unsigned long mdt_num_threads;
73 CFS_MODULE_PARM(mdt_num_threads, "ul", ulong, 0444,
74                 "number of MDS service threads to start "
75                 "(deprecated in favor of mds_num_threads)");
76
77 static unsigned long mds_num_threads;
78 CFS_MODULE_PARM(mds_num_threads, "ul", ulong, 0444,
79                 "number of MDS service threads to start");
80
81 static char *mds_num_cpts;
82 CFS_MODULE_PARM(mds_num_cpts, "c", charp, 0444,
83                 "CPU partitions MDS threads should run on");
84
85 static unsigned long mds_rdpg_num_threads;
86 CFS_MODULE_PARM(mds_rdpg_num_threads, "ul", ulong, 0444,
87                 "number of MDS readpage service threads to start");
88
89 static char *mds_rdpg_num_cpts;
90 CFS_MODULE_PARM(mds_rdpg_num_cpts, "c", charp, 0444,
91                 "CPU partitions MDS readpage threads should run on");
92
93 /* NB: these two should be removed along with setattr service in the future */
94 static unsigned long mds_attr_num_threads;
95 CFS_MODULE_PARM(mds_attr_num_threads, "ul", ulong, 0444,
96                 "number of MDS setattr service threads to start");
97
98 static char *mds_attr_num_cpts;
99 CFS_MODULE_PARM(mds_attr_num_cpts, "c", charp, 0444,
100                 "CPU partitions MDS setattr threads should run on");
101
102 #define DEFINE_RPC_HANDLER(base, flags, opc, fn, fmt)                   \
103 [opc - base] = {                                                        \
104         .mh_name        = #opc,                                         \
105         .mh_fail_id     = OBD_FAIL_ ## opc ## _NET,                     \
106         .mh_opc         = opc,                                          \
107         .mh_flags       = flags,                                        \
108         .mh_act         = fn,                                           \
109         .mh_fmt         = fmt                                           \
110 }
111
112 /* Request with a format known in advance */
113 #define DEF_MDT_HDL(flags, name, fn)                                    \
114         DEFINE_RPC_HANDLER(MDS_GETATTR, flags, name, fn, &RQF_ ## name)
115
116 /* Request with a format we do not yet know */
117 #define DEF_MDT_HDL_VAR(flags, name, fn)                                \
118         DEFINE_RPC_HANDLER(MDS_GETATTR, flags, name, fn, NULL)
119
120 /* Map one non-standard request format handler.  This should probably get
121  * a common OBD_SET_INFO RPC opcode instead of this mismatch. */
122 #define RQF_MDS_SET_INFO RQF_OBD_SET_INFO
123
124 static struct mdt_handler mdt_mds_ops[] = {
125 DEF_MDT_HDL(0,                          MDS_CONNECT,      mdt_connect),
126 DEF_MDT_HDL(0,                          MDS_DISCONNECT,   mdt_disconnect),
127 DEF_MDT_HDL(0,                          MDS_SET_INFO,     mdt_set_info),
128 DEF_MDT_HDL(0,                          MDS_GET_INFO,     mdt_get_info),
129 DEF_MDT_HDL(0           | HABEO_REFERO, MDS_GETSTATUS,    mdt_getstatus),
130 DEF_MDT_HDL(HABEO_CORPUS,               MDS_GETATTR,      mdt_getattr),
131 DEF_MDT_HDL(HABEO_CORPUS| HABEO_REFERO, MDS_GETATTR_NAME, mdt_getattr_name),
132 DEF_MDT_HDL(HABEO_CORPUS,               MDS_GETXATTR,     mdt_getxattr),
133 DEF_MDT_HDL(0           | HABEO_REFERO, MDS_STATFS,       mdt_statfs),
134 DEF_MDT_HDL(0           | MUTABOR,      MDS_REINT,        mdt_reint),
135 DEF_MDT_HDL(HABEO_CORPUS,               MDS_CLOSE,        mdt_close),
136 DEF_MDT_HDL(HABEO_CORPUS,               MDS_DONE_WRITING, mdt_done_writing),
137 DEF_MDT_HDL(0           | HABEO_REFERO, MDS_PIN,          mdt_pin),
138 DEF_MDT_HDL_VAR(0,                      MDS_SYNC,         mdt_sync),
139 DEF_MDT_HDL(HABEO_CORPUS| HABEO_REFERO, MDS_IS_SUBDIR,    mdt_is_subdir),
140 DEF_MDT_HDL(0,                          MDS_QUOTACHECK,   mdt_quotacheck),
141 DEF_MDT_HDL(0,                          MDS_QUOTACTL,     mdt_quotactl),
142 DEF_MDT_HDL(HABEO_CORPUS| HABEO_REFERO, MDS_HSM_PROGRESS, mdt_hsm_progress),
143 DEF_MDT_HDL(HABEO_CORPUS| HABEO_REFERO, MDS_HSM_CT_REGISTER,
144                                                 mdt_hsm_ct_register),
145 DEF_MDT_HDL(HABEO_CORPUS| HABEO_REFERO, MDS_HSM_CT_UNREGISTER,
146                                                 mdt_hsm_ct_unregister),
147 DEF_MDT_HDL(HABEO_CORPUS| HABEO_REFERO, MDS_HSM_STATE_GET,
148                                                 mdt_hsm_state_get),
149 DEF_MDT_HDL(HABEO_CORPUS| HABEO_REFERO, MDS_HSM_STATE_SET,
150                                                 mdt_hsm_state_set),
151 DEF_MDT_HDL(HABEO_CORPUS| HABEO_REFERO, MDS_HSM_ACTION, mdt_hsm_action),
152 DEF_MDT_HDL(HABEO_CORPUS| HABEO_REFERO, MDS_HSM_REQUEST, mdt_hsm_request),
153 DEF_MDT_HDL(HABEO_CORPUS|HABEO_REFERO,  MDS_SWAP_LAYOUTS, mdt_swap_layouts)
154 };
155
156 #define DEF_OBD_HDL(flags, name, fn)                                    \
157         DEFINE_RPC_HANDLER(OBD_PING, flags, name, fn, NULL)
158
159 static struct mdt_handler mdt_obd_ops[] = {
160 DEF_OBD_HDL(0,                          OBD_PING,         mdt_obd_ping),
161 DEF_OBD_HDL(0,                          OBD_LOG_CANCEL,   mdt_obd_log_cancel),
162 DEF_OBD_HDL(0,                          OBD_QC_CALLBACK,  mdt_obd_qc_callback),
163 DEF_OBD_HDL(0,                          OBD_IDX_READ,     mdt_obd_idx_read)
164 };
165
166 #define DEF_DLM_HDL_VAR(flags, name, fn)                                \
167         DEFINE_RPC_HANDLER(LDLM_ENQUEUE, flags, name, fn, NULL)
168 #define DEF_DLM_HDL(flags, name, fn)                                    \
169         DEFINE_RPC_HANDLER(LDLM_ENQUEUE, flags, name, fn, &RQF_ ## name)
170
171 static struct mdt_handler mdt_dlm_ops[] = {
172 DEF_DLM_HDL    (HABEO_CLAVIS,           LDLM_ENQUEUE,     mdt_enqueue),
173 DEF_DLM_HDL_VAR(HABEO_CLAVIS,           LDLM_CONVERT,     mdt_convert),
174 DEF_DLM_HDL_VAR(0,                      LDLM_BL_CALLBACK, mdt_bl_callback),
175 DEF_DLM_HDL_VAR(0,                      LDLM_CP_CALLBACK, mdt_cp_callback)
176 };
177
178 #define DEF_LLOG_HDL(flags, name, fn)                                   \
179         DEFINE_RPC_HANDLER(LLOG_ORIGIN_HANDLE_CREATE, flags, name, fn, NULL)
180
181 static struct mdt_handler mdt_llog_ops[] = {
182 DEF_LLOG_HDL(0,         LLOG_ORIGIN_HANDLE_CREATE,        mdt_llog_create),
183 DEF_LLOG_HDL(0,         LLOG_ORIGIN_HANDLE_NEXT_BLOCK,    mdt_llog_next_block),
184 DEF_LLOG_HDL(0,         LLOG_ORIGIN_HANDLE_READ_HEADER,   mdt_llog_read_header),
185 DEF_LLOG_HDL(0,         LLOG_ORIGIN_HANDLE_WRITE_REC,     NULL),
186 DEF_LLOG_HDL(0,         LLOG_ORIGIN_HANDLE_CLOSE,         NULL),
187 DEF_LLOG_HDL(0,         LLOG_ORIGIN_CONNECT,              NULL),
188 DEF_LLOG_HDL(0,         LLOG_CATINFO,                     NULL),
189 DEF_LLOG_HDL(0,         LLOG_ORIGIN_HANDLE_PREV_BLOCK,    mdt_llog_prev_block),
190 DEF_LLOG_HDL(0,         LLOG_ORIGIN_HANDLE_DESTROY,       mdt_llog_destroy),
191 };
192
193 #define DEF_SEC_HDL(flags, name, fn)                                    \
194         DEFINE_RPC_HANDLER(SEC_CTX_INIT, flags, name, fn, NULL)
195
196 static struct mdt_handler mdt_sec_ctx_ops[] = {
197 DEF_SEC_HDL(0,                          SEC_CTX_INIT,     mdt_sec_ctx_handle),
198 DEF_SEC_HDL(0,                          SEC_CTX_INIT_CONT,mdt_sec_ctx_handle),
199 DEF_SEC_HDL(0,                          SEC_CTX_FINI,     mdt_sec_ctx_handle)
200 };
201
202 #define DEF_QUOTA_HDL(flags, name, fn)                          \
203         DEFINE_RPC_HANDLER(QUOTA_DQACQ, flags, name, fn, &RQF_ ## name)
204
205 static struct mdt_handler mdt_quota_ops[] = {
206 DEF_QUOTA_HDL(HABEO_REFERO,             QUOTA_DQACQ,      mdt_quota_dqacq),
207 };
208
209 struct mdt_opc_slice mdt_regular_handlers[] = {
210         {
211                 .mos_opc_start  = MDS_GETATTR,
212                 .mos_opc_end    = MDS_LAST_OPC,
213                 .mos_hs         = mdt_mds_ops
214         },
215         {
216                 .mos_opc_start  = OBD_PING,
217                 .mos_opc_end    = OBD_LAST_OPC,
218                 .mos_hs         = mdt_obd_ops
219         },
220         {
221                 .mos_opc_start  = LDLM_ENQUEUE,
222                 .mos_opc_end    = LDLM_LAST_OPC,
223                 .mos_hs         = mdt_dlm_ops
224         },
225         {
226                 .mos_opc_start  = LLOG_ORIGIN_HANDLE_CREATE,
227                 .mos_opc_end    = LLOG_LAST_OPC,
228                 .mos_hs         = mdt_llog_ops
229         },
230         {
231                 .mos_opc_start  = SEC_CTX_INIT,
232                 .mos_opc_end    = SEC_LAST_OPC,
233                 .mos_hs         = mdt_sec_ctx_ops
234         },
235         {
236                 .mos_opc_start  = QUOTA_DQACQ,
237                 .mos_opc_end    = QUOTA_LAST_OPC,
238                 .mos_hs         = mdt_quota_ops
239         },
240         {
241                 .mos_hs         = NULL
242         }
243 };
244
245 /* Readpage/readdir handlers */
246 static struct mdt_handler mdt_readpage_ops[] = {
247 DEF_MDT_HDL(0,                  MDS_CONNECT,  mdt_connect),
248 DEF_MDT_HDL(HABEO_CORPUS | HABEO_REFERO, MDS_READPAGE, mdt_readpage),
249 /* XXX: this is ugly and should be fixed one day, see mdc_close() for
250  * detailed comments. --umka */
251 DEF_MDT_HDL(HABEO_CORPUS,               MDS_CLOSE,        mdt_close),
252 DEF_MDT_HDL(HABEO_CORPUS,               MDS_DONE_WRITING, mdt_done_writing),
253 };
254
255 static struct mdt_opc_slice mdt_readpage_handlers[] = {
256         {
257                 .mos_opc_start = MDS_GETATTR,
258                 .mos_opc_end   = MDS_LAST_OPC,
259                 .mos_hs = mdt_readpage_ops
260         },
261         {
262                 .mos_opc_start = OBD_FIRST_OPC,
263                 .mos_opc_end   = OBD_LAST_OPC,
264                 .mos_hs = mdt_obd_ops
265         },
266         {
267                 .mos_hs = NULL
268         }
269 };
270
271 static int mds_regular_handle(struct ptlrpc_request *req)
272 {
273         return mdt_handle_common(req, mdt_regular_handlers);
274 }
275
276 static int mds_readpage_handle(struct ptlrpc_request *req)
277 {
278         return mdt_handle_common(req, mdt_readpage_handlers);
279 }
280
281 /* device init/fini methods */
282 static void mds_stop_ptlrpc_service(struct mds_device *m)
283 {
284         ENTRY;
285         if (m->mds_regular_service != NULL) {
286                 ptlrpc_unregister_service(m->mds_regular_service);
287                 m->mds_regular_service = NULL;
288         }
289         if (m->mds_readpage_service != NULL) {
290                 ptlrpc_unregister_service(m->mds_readpage_service);
291                 m->mds_readpage_service = NULL;
292         }
293         if (m->mds_out_service != NULL) {
294                 ptlrpc_unregister_service(m->mds_out_service);
295                 m->mds_out_service = NULL;
296         }
297         if (m->mds_setattr_service != NULL) {
298                 ptlrpc_unregister_service(m->mds_setattr_service);
299                 m->mds_setattr_service = NULL;
300         }
301         if (m->mds_mdsc_service != NULL) {
302                 ptlrpc_unregister_service(m->mds_mdsc_service);
303                 m->mds_mdsc_service = NULL;
304         }
305         if (m->mds_mdss_service != NULL) {
306                 ptlrpc_unregister_service(m->mds_mdss_service);
307                 m->mds_mdss_service = NULL;
308         }
309         if (m->mds_fld_service != NULL) {
310                 ptlrpc_unregister_service(m->mds_fld_service);
311                 m->mds_fld_service = NULL;
312         }
313         EXIT;
314 }
315
316 static int mds_start_ptlrpc_service(struct mds_device *m)
317 {
318         static struct ptlrpc_service_conf conf;
319         struct obd_device *obd = m->mds_md_dev.md_lu_dev.ld_obd;
320         cfs_proc_dir_entry_t *procfs_entry;
321         int rc = 0;
322         ENTRY;
323
324         procfs_entry = obd->obd_proc_entry;
325         LASSERT(procfs_entry != NULL);
326
327         conf = (typeof(conf)) {
328                 .psc_name               = LUSTRE_MDT_NAME,
329                 .psc_watchdog_factor    = MDT_SERVICE_WATCHDOG_FACTOR,
330                 .psc_buf                = {
331                         .bc_nbufs               = MDS_NBUFS,
332                         .bc_buf_size            = MDS_REG_BUFSIZE,
333                         .bc_req_max_size        = MDS_REG_MAXREQSIZE,
334                         .bc_rep_max_size        = MDS_REG_MAXREPSIZE,
335                         .bc_req_portal          = MDS_REQUEST_PORTAL,
336                         .bc_rep_portal          = MDC_REPLY_PORTAL,
337                 },
338                 /*
339                  * We'd like to have a mechanism to set this on a per-device
340                  * basis, but alas...
341                  */
342                 .psc_thr                = {
343                         .tc_thr_name            = LUSTRE_MDT_NAME,
344                         .tc_thr_factor          = MDS_THR_FACTOR,
345                         .tc_nthrs_init          = MDS_NTHRS_INIT,
346                         .tc_nthrs_base          = MDS_NTHRS_BASE,
347                         .tc_nthrs_max           = MDS_NTHRS_MAX,
348                         .tc_nthrs_user          = mds_num_threads,
349                         .tc_cpu_affinity        = 1,
350                         .tc_ctx_tags            = LCT_MD_THREAD,
351                 },
352                 .psc_cpt                = {
353                         .cc_pattern             = mds_num_cpts,
354                 },
355                 .psc_ops                = {
356                         .so_req_handler         = mds_regular_handle,
357                         .so_req_printer         = target_print_req,
358                         .so_hpreq_handler       = ptlrpc_hpreq_handler,
359                 },
360         };
361         m->mds_regular_service = ptlrpc_register_service(&conf, procfs_entry);
362         if (IS_ERR(m->mds_regular_service)) {
363                 rc = PTR_ERR(m->mds_regular_service);
364                 CERROR("failed to start regular mdt service: %d\n", rc);
365                 m->mds_regular_service = NULL;
366
367                 RETURN(rc);
368         }
369
370         /*
371          * readpage service configuration. Parameters have to be adjusted,
372          * ideally.
373          */
374         memset(&conf, 0, sizeof(conf));
375         conf = (typeof(conf)) {
376                 .psc_name               = LUSTRE_MDT_NAME "_readpage",
377                 .psc_watchdog_factor    = MDT_SERVICE_WATCHDOG_FACTOR,
378                 .psc_buf                = {
379                         .bc_nbufs               = MDS_NBUFS,
380                         .bc_buf_size            = MDS_BUFSIZE,
381                         .bc_req_max_size        = MDS_MAXREQSIZE,
382                         .bc_rep_max_size        = MDS_MAXREPSIZE,
383                         .bc_req_portal          = MDS_READPAGE_PORTAL,
384                         .bc_rep_portal          = MDC_REPLY_PORTAL,
385                 },
386                 .psc_thr                = {
387                         .tc_thr_name            = LUSTRE_MDT_NAME "_rdpg",
388                         .tc_thr_factor          = MDS_RDPG_THR_FACTOR,
389                         .tc_nthrs_init          = MDS_RDPG_NTHRS_INIT,
390                         .tc_nthrs_base          = MDS_RDPG_NTHRS_BASE,
391                         .tc_nthrs_max           = MDS_RDPG_NTHRS_MAX,
392                         .tc_nthrs_user          = mds_rdpg_num_threads,
393                         .tc_cpu_affinity        = 1,
394                         .tc_ctx_tags            = LCT_MD_THREAD,
395                 },
396                 .psc_cpt                = {
397                         .cc_pattern             = mds_rdpg_num_cpts,
398                 },
399                 .psc_ops                = {
400                         .so_req_handler         = mds_readpage_handle,
401                         .so_req_printer         = target_print_req,
402                 },
403         };
404         m->mds_readpage_service = ptlrpc_register_service(&conf, procfs_entry);
405         if (IS_ERR(m->mds_readpage_service)) {
406                 rc = PTR_ERR(m->mds_readpage_service);
407                 CERROR("failed to start readpage service: %d\n", rc);
408                 m->mds_readpage_service = NULL;
409
410                 GOTO(err_mds_svc, rc);
411         }
412
413         /*
414          * setattr service configuration.
415          *
416          * XXX To keep the compatibility with old client(< 2.2), we need to
417          * preserve this portal for a certain time, it should be removed
418          * eventually. LU-617.
419          */
420         memset(&conf, 0, sizeof(conf));
421         conf = (typeof(conf)) {
422                 .psc_name               = LUSTRE_MDT_NAME "_setattr",
423                 .psc_watchdog_factor    = MDT_SERVICE_WATCHDOG_FACTOR,
424                 .psc_buf                = {
425                         .bc_nbufs               = MDS_NBUFS,
426                         .bc_buf_size            = MDS_BUFSIZE,
427                         .bc_req_max_size        = MDS_MAXREQSIZE,
428                         .bc_rep_max_size        = MDS_LOV_MAXREPSIZE,
429                         .bc_req_portal          = MDS_SETATTR_PORTAL,
430                         .bc_rep_portal          = MDC_REPLY_PORTAL,
431                 },
432                 .psc_thr                = {
433                         .tc_thr_name            = LUSTRE_MDT_NAME "_attr",
434                         .tc_thr_factor          = MDS_SETA_THR_FACTOR,
435                         .tc_nthrs_init          = MDS_SETA_NTHRS_INIT,
436                         .tc_nthrs_base          = MDS_SETA_NTHRS_BASE,
437                         .tc_nthrs_max           = MDS_SETA_NTHRS_MAX,
438                         .tc_nthrs_user          = mds_attr_num_threads,
439                         .tc_cpu_affinity        = 1,
440                         .tc_ctx_tags            = LCT_MD_THREAD,
441                 },
442                 .psc_cpt                = {
443                         .cc_pattern             = mds_attr_num_cpts,
444                 },
445                 .psc_ops                = {
446                         .so_req_handler         = mds_regular_handle,
447                         .so_req_printer         = target_print_req,
448                         .so_hpreq_handler       = NULL,
449                 },
450         };
451         m->mds_setattr_service = ptlrpc_register_service(&conf, procfs_entry);
452         if (IS_ERR(m->mds_setattr_service)) {
453                 rc = PTR_ERR(m->mds_setattr_service);
454                 CERROR("failed to start setattr service: %d\n", rc);
455                 m->mds_setattr_service = NULL;
456
457                 GOTO(err_mds_svc, rc);
458         }
459
460         /* Object update service */
461         conf = (typeof(conf)) {
462                 .psc_name               = LUSTRE_MDT_NAME "_out",
463                 .psc_watchdog_factor    = MDT_SERVICE_WATCHDOG_FACTOR,
464                 .psc_buf                = {
465                         .bc_nbufs               = MDS_NBUFS,
466                         .bc_buf_size            = OUT_BUFSIZE,
467                         .bc_req_max_size        = OUT_MAXREQSIZE,
468                         .bc_rep_max_size        = OUT_MAXREPSIZE,
469                         .bc_req_portal          = OUT_PORTAL,
470                         .bc_rep_portal          = MDC_REPLY_PORTAL,
471                 },
472                 /*
473                  * We'd like to have a mechanism to set this on a per-device
474                  * basis, but alas...
475                  */
476                 .psc_thr                = {
477                         .tc_thr_name            = LUSTRE_MDT_NAME "_out",
478                         .tc_thr_factor          = MDS_THR_FACTOR,
479                         .tc_nthrs_init          = MDS_NTHRS_INIT,
480                         .tc_nthrs_base          = MDS_NTHRS_BASE,
481                         .tc_nthrs_max           = MDS_NTHRS_MAX,
482                         .tc_nthrs_user          = mds_num_threads,
483                         .tc_cpu_affinity        = 1,
484                         .tc_ctx_tags            = LCT_MD_THREAD,
485                 },
486                 .psc_cpt                = {
487                         .cc_pattern             = mds_num_cpts,
488                 },
489                 .psc_ops                = {
490                         .so_req_handler         = tgt_request_handle,
491                         .so_req_printer         = target_print_req,
492                         .so_hpreq_handler       = NULL,
493                 },
494         };
495         m->mds_out_service = ptlrpc_register_service(&conf, procfs_entry);
496         if (IS_ERR(m->mds_out_service)) {
497                 rc = PTR_ERR(m->mds_out_service);
498                 CERROR("failed to start out service: %d\n", rc);
499                 m->mds_out_service = NULL;
500                 GOTO(err_mds_svc, rc);
501         }
502
503         /*
504          * sequence controller service configuration
505          */
506         memset(&conf, 0, sizeof(conf));
507         conf = (typeof(conf)) {
508                 .psc_name               = LUSTRE_MDT_NAME "_seqs",
509                 .psc_watchdog_factor    = MDT_SERVICE_WATCHDOG_FACTOR,
510                 .psc_buf                = {
511                         .bc_nbufs               = MDS_NBUFS,
512                         .bc_buf_size            = SEQ_BUFSIZE,
513                         .bc_req_max_size        = SEQ_MAXREQSIZE,
514                         .bc_rep_max_size        = SEQ_MAXREPSIZE,
515                         .bc_req_portal          = SEQ_CONTROLLER_PORTAL,
516                         .bc_rep_portal          = MDC_REPLY_PORTAL,
517                 },
518                 .psc_thr                = {
519                         .tc_thr_name            = LUSTRE_MDT_NAME "_seqs",
520                         .tc_nthrs_init          = MDS_OTHR_NTHRS_INIT,
521                         .tc_nthrs_max           = MDS_OTHR_NTHRS_MAX,
522                         .tc_ctx_tags            = LCT_MD_THREAD,
523                 },
524                 .psc_ops                = {
525                         .so_req_handler         = tgt_request_handle,
526                         .so_req_printer         = target_print_req,
527                         .so_hpreq_handler       = NULL,
528                 },
529         };
530         m->mds_mdsc_service = ptlrpc_register_service(&conf, procfs_entry);
531         if (IS_ERR(m->mds_mdsc_service)) {
532                 rc = PTR_ERR(m->mds_mdsc_service);
533                 CERROR("failed to start seq controller service: %d\n", rc);
534                 m->mds_mdsc_service = NULL;
535
536                 GOTO(err_mds_svc, rc);
537         }
538
539         /*
540          * metadata sequence server service configuration
541          */
542         memset(&conf, 0, sizeof(conf));
543         conf = (typeof(conf)) {
544                 .psc_name               = LUSTRE_MDT_NAME "_seqm",
545                 .psc_watchdog_factor    = MDT_SERVICE_WATCHDOG_FACTOR,
546                 .psc_buf                = {
547                         .bc_nbufs               = MDS_NBUFS,
548                         .bc_buf_size            = SEQ_BUFSIZE,
549                         .bc_req_max_size        = SEQ_MAXREQSIZE,
550                         .bc_rep_max_size        = SEQ_MAXREPSIZE,
551                         .bc_req_portal          = SEQ_METADATA_PORTAL,
552                         .bc_rep_portal          = MDC_REPLY_PORTAL,
553                 },
554                 .psc_thr                = {
555                         .tc_thr_name            = LUSTRE_MDT_NAME "_seqm",
556                         .tc_nthrs_init          = MDS_OTHR_NTHRS_INIT,
557                         .tc_nthrs_max           = MDS_OTHR_NTHRS_MAX,
558                         .tc_ctx_tags            = LCT_MD_THREAD | LCT_DT_THREAD
559                 },
560                 .psc_ops                = {
561                         .so_req_handler         = tgt_request_handle,
562                         .so_req_printer         = target_print_req,
563                         .so_hpreq_handler       = NULL,
564                 },
565         };
566         m->mds_mdss_service = ptlrpc_register_service(&conf, procfs_entry);
567         if (IS_ERR(m->mds_mdss_service)) {
568                 rc = PTR_ERR(m->mds_mdss_service);
569                 CERROR("failed to start metadata seq server service: %d\n", rc);
570                 m->mds_mdss_service = NULL;
571
572                 GOTO(err_mds_svc, rc);
573         }
574
575         /* FLD service start */
576         memset(&conf, 0, sizeof(conf));
577         conf = (typeof(conf)) {
578                 .psc_name            = LUSTRE_MDT_NAME "_fld",
579                 .psc_watchdog_factor = MDT_SERVICE_WATCHDOG_FACTOR,
580                 .psc_buf                = {
581                         .bc_nbufs               = MDS_NBUFS,
582                         .bc_buf_size            = FLD_BUFSIZE,
583                         .bc_req_max_size        = FLD_MAXREQSIZE,
584                         .bc_rep_max_size        = FLD_MAXREPSIZE,
585                         .bc_req_portal          = FLD_REQUEST_PORTAL,
586                         .bc_rep_portal          = MDC_REPLY_PORTAL,
587                 },
588                 .psc_thr                = {
589                         .tc_thr_name            = LUSTRE_MDT_NAME "_fld",
590                         .tc_nthrs_init          = MDS_OTHR_NTHRS_INIT,
591                         .tc_nthrs_max           = MDS_OTHR_NTHRS_MAX,
592                         .tc_ctx_tags            = LCT_DT_THREAD | LCT_MD_THREAD,
593                 },
594                 .psc_ops                = {
595                         .so_req_handler         = tgt_request_handle,
596                         .so_req_printer         = target_print_req,
597                         .so_hpreq_handler       = NULL,
598                 },
599         };
600         m->mds_fld_service = ptlrpc_register_service(&conf, procfs_entry);
601         if (IS_ERR(m->mds_fld_service)) {
602                 rc = PTR_ERR(m->mds_fld_service);
603                 CERROR("failed to start fld service: %d\n", rc);
604                 m->mds_fld_service = NULL;
605
606                 GOTO(err_mds_svc, rc);
607         }
608
609         EXIT;
610 err_mds_svc:
611         if (rc)
612                 mds_stop_ptlrpc_service(m);
613
614         return rc;
615 }
616
617 static inline struct mds_device *mds_dev(struct lu_device *d)
618 {
619         return container_of0(d, struct mds_device, mds_md_dev.md_lu_dev);
620 }
621
622 static struct lu_device *mds_device_fini(const struct lu_env *env,
623                                          struct lu_device *d)
624 {
625         struct mds_device *m = mds_dev(d);
626         struct obd_device *obd = d->ld_obd;
627         ENTRY;
628
629         mds_stop_ptlrpc_service(m);
630         lprocfs_obd_cleanup(obd);
631         RETURN(NULL);
632 }
633
634 static struct lu_device *mds_device_free(const struct lu_env *env,
635                                          struct lu_device *d)
636 {
637         struct mds_device *m = mds_dev(d);
638         ENTRY;
639
640         md_device_fini(&m->mds_md_dev);
641         OBD_FREE_PTR(m);
642         RETURN(NULL);
643 }
644
645 static struct lu_device *mds_device_alloc(const struct lu_env *env,
646                                           struct lu_device_type *t,
647                                           struct lustre_cfg *cfg)
648 {
649         struct mds_device        *m;
650         struct obd_device        *obd;
651         struct lu_device          *l;
652         int rc;
653
654         OBD_ALLOC_PTR(m);
655         if (m == NULL)
656                 return ERR_PTR(-ENOMEM);
657
658         md_device_init(&m->mds_md_dev, t);
659         l = &m->mds_md_dev.md_lu_dev;
660
661         obd = class_name2obd(lustre_cfg_string(cfg, 0));
662         LASSERT(obd != NULL);
663
664         l->ld_obd = obd;
665         /* set this lu_device to obd, because error handling need it */
666         obd->obd_lu_dev = l;
667
668         rc = lprocfs_obd_setup(obd, lprocfs_mds_obd_vars);
669         if (rc != 0) {
670                 mds_device_free(env, l);
671                 l = ERR_PTR(rc);
672                 return l;
673         }
674
675         rc = mds_start_ptlrpc_service(m);
676
677         if (rc != 0) {
678                 mds_device_free(env, l);
679                 l = ERR_PTR(rc);
680                 return l;
681         }
682         return l;
683 }
684
685 /* type constructor/destructor: mdt_type_init, mdt_type_fini */
686 LU_TYPE_INIT_FINI(mds, &mdt_thread_key);
687
688 static struct lu_device_type_operations mds_device_type_ops = {
689         .ldto_init = mds_type_init,
690         .ldto_fini = mds_type_fini,
691
692         .ldto_start = mds_type_start,
693         .ldto_stop  = mds_type_stop,
694
695         .ldto_device_alloc = mds_device_alloc,
696         .ldto_device_free  = mds_device_free,
697         .ldto_device_fini  = mds_device_fini
698 };
699
700 static struct lu_device_type mds_device_type = {
701         .ldt_tags     = LU_DEVICE_MD,
702         .ldt_name     = LUSTRE_MDS_NAME,
703         .ldt_ops      = &mds_device_type_ops,
704         .ldt_ctx_tags = LCT_MD_THREAD
705 };
706
707 static struct obd_ops mds_obd_device_ops = {
708         .o_owner           = THIS_MODULE,
709 };
710
711 int mds_mod_init(void)
712 {
713         int rc;
714
715         if (mdt_num_threads != 0 && mds_num_threads == 0) {
716                 LCONSOLE_INFO("mdt_num_threads module parameter is deprecated, "
717                               "use mds_num_threads instead or unset both for "
718                               "dynamic thread startup\n");
719                 mds_num_threads = mdt_num_threads;
720         }
721
722         rc = class_register_type(&mds_obd_device_ops, NULL,
723                                  lprocfs_mds_module_vars, LUSTRE_MDS_NAME,
724                                  &mds_device_type);
725         return rc;
726 }
727
728 void mds_mod_exit(void)
729 {
730         class_unregister_type(LUSTRE_MDS_NAME);
731 }